This is an automated email from the ASF dual-hosted git repository.
imbajin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new f69ca66cd fix(server): align cache event actions in legacy EventHub
path (#3017)
f69ca66cd is described below
commit f69ca66cd6c7e1e466cd6f3f28fc843b0ea64409
Author: Davide Polato <[email protected]>
AuthorDate: Fri May 15 05:25:33 2026 +0200
fix(server): align cache event actions in legacy EventHub path (#3017)
- Align legacy cache invalidation producers and listeners on ACTION_INVALID
and
ACTION_CLEAR, removing the obsolete ACTION_INVALIDED/ACTION_CLEARED
constants.
- Add EventHub.notifyExcept(...) so cache transactions and the cache
notifier
bridge can avoid re-processing their own local listener while still
delivering
events to other listeners.
- Track registered graph/schema cache listeners per graph so
notifyExcept(...)
uses the listener instance actually registered on the EventHub, including
multi-transaction cases where later transactions reuse the first listener.
- Update cache notifier forwarding to prevent local RPC bridge loops after
action
names are unified.
- Add regression coverage for notifyExcept semantics, graph/schema action
names,
listener teardown/re-registration, and notifier no-loop behavior.
- The holder keeps the EventHub listener registered while any transaction
for the
graph is alive, and unregisters/removes it only when the last transaction
releases it. The registry update, ref-count decrement, and hub unlisten
now run
inside ConcurrentMap.compute() to avoid owner-closes-first invalidation
gaps.
Also add graph/schema regression coverage for owner-first close and
last-close
cleanup, including graph close/reopen handling for stale EventHub holders.
---
.../java/org/apache/hugegraph/event/EventHub.java | 26 ++-
.../apache/hugegraph/unit/event/EventHubTest.java | 37 +++
.../org/apache/hugegraph/StandardHugeGraph.java | 13 +-
.../org/apache/hugegraph/backend/cache/Cache.java | 2 -
.../backend/cache/CacheListenerHolder.java | 41 ++++
.../backend/cache/CachedGraphTransaction.java | 75 ++++--
.../backend/cache/CachedSchemaTransaction.java | 69 +++++-
.../hugegraph/backend/tx/GraphTransaction.java | 2 -
.../hugegraph/unit/cache/CacheManagerTest.java | 26 ++-
.../unit/cache/CachedGraphTransactionTest.java | 232 ++++++++++++++++++-
.../unit/cache/CachedSchemaTransactionTest.java | 257 ++++++++++++++++++++-
11 files changed, 727 insertions(+), 53 deletions(-)
diff --git
a/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
b/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
index b37c67133..fbdf460a9 100644
---
a/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
+++
b/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
@@ -149,7 +149,27 @@ public class EventHub {
return count;
}
+ /**
+ * Notify all registered listeners for {@code event} EXCEPT
+ * {@code ignoredListener}. ANY_EVENT listeners are notified unless they
+ * are the ignored one.
+ *
+ * @return a Future<Integer> resolving to the count of listeners actually
+ * invoked (the ignored listener is NOT counted)
+ */
+ public Future<Integer> notifyExcept(String event,
+ EventListener ignoredListener,
+ @Nullable Object... args) {
+ return this.notify(event, ignoredListener, args);
+ }
+
public Future<Integer> notify(String event, @Nullable Object... args) {
+ return this.notify(event, null, args);
+ }
+
+ private Future<Integer> notify(String event,
+ EventListener ignoredListener,
+ @Nullable Object... args) {
@SuppressWarnings("resource")
ExtendableIterator<EventListener> all = new ExtendableIterator<>();
@@ -173,8 +193,12 @@ public class EventHub {
int count = 0;
// Notify all listeners, and ignore the results
while (all.hasNext()) {
+ EventListener listener = all.next();
+ if (listener == ignoredListener) {
+ continue;
+ }
try {
- all.next().event(ev);
+ listener.event(ev);
count++;
} catch (Throwable e) {
LOG.warn("Failed to handle event: {}", ev, e);
diff --git
a/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
b/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
index dff022702..69472bc8e 100644
---
a/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
+++
b/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
@@ -388,6 +388,43 @@ public class EventHubTest extends BaseUnitTest {
Assert.assertEquals(1, count.get());
}
+ @Test
+ public void testNotifyExcept() throws Exception {
+ final String notify = "event-notify";
+ AtomicInteger listenerACount = new AtomicInteger();
+ AtomicInteger listenerBCount = new AtomicInteger();
+ AtomicInteger listenerCCount = new AtomicInteger();
+
+ EventListener listenerA = event -> {
+ event.checkArgs(String.class);
+ Assert.assertEquals("fake-arg", event.args()[0]);
+ listenerACount.incrementAndGet();
+ return true;
+ };
+ EventListener listenerB = event -> {
+ listenerBCount.incrementAndGet();
+ return true;
+ };
+ EventListener listenerC = event -> {
+ event.checkArgs(String.class);
+ Assert.assertEquals("fake-arg", event.args()[0]);
+ listenerCCount.incrementAndGet();
+ return true;
+ };
+
+ this.eventHub.listen(notify, listenerA);
+ this.eventHub.listen(notify, listenerB);
+ this.eventHub.listen(EventHub.ANY_EVENT, listenerC);
+
+ Assert.assertEquals(2, (int) this.eventHub
+ .notifyExcept(notify, listenerB,
+ "fake-arg")
+ .get());
+ Assert.assertEquals(1, listenerACount.get());
+ Assert.assertEquals(0, listenerBCount.get());
+ Assert.assertEquals(1, listenerCCount.get());
+ }
+
@Test
public void testEventNotifyWithMultiThreads() throws InterruptedException {
final String notify = "event-notify";
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index faf97aa8d..f8f24ab62 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -1394,7 +1394,7 @@ public class StandardHugeGraph implements HugeGraph {
"Expect event action argument");
String action = (String) args[0];
LOG.debug("Event action: {}", action);
- if (Cache.ACTION_INVALIDED.equals(action)) {
+ if (Cache.ACTION_INVALID.equals(action)) {
event.checkArgs(String.class, HugeType.class,
Object.class);
HugeType type = (HugeType) args[1];
Object ids = args[2];
@@ -1410,7 +1410,7 @@ public class StandardHugeGraph implements HugeGraph {
E.checkArgument(false, "Unexpected argument: %s",
ids);
}
return true;
- } else if (Cache.ACTION_CLEARED.equals(action)) {
+ } else if (Cache.ACTION_CLEAR.equals(action)) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
LOG.debug("Calling proxy.clear with type: {}", type);
@@ -1435,17 +1435,20 @@ public class StandardHugeGraph implements HugeGraph {
@Override
public void invalid(HugeType type, Id id) {
- this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id);
+ this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ Cache.ACTION_INVALID, type, id);
}
@Override
public void invalid2(HugeType type, Object[] ids) {
- this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids);
+ this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ Cache.ACTION_INVALID, type, ids);
}
@Override
public void clear(HugeType type) {
- this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
+ this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ Cache.ACTION_CLEAR, type);
}
@Override
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
index f58a9ce45..73910fd08 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
@@ -24,8 +24,6 @@ public interface Cache<K, V> {
String ACTION_INVALID = "invalid";
String ACTION_CLEAR = "clear";
- String ACTION_INVALIDED = "invalided";
- String ACTION_CLEARED = "cleared";
V get(K id);
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java
new file mode 100644
index 000000000..b4133679b
--- /dev/null
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.backend.cache;
+
+import org.apache.hugegraph.event.EventHub;
+import org.apache.hugegraph.event.EventListener;
+
+/*
+ * Listener lifetime must cover all active transactions for the graph.
+ * The holder is removed from the registry and unregistered from EventHub
+ * only when the last transaction releases it.
+ */
+final class CacheListenerHolder {
+
+ final EventListener listener;
+ final EventHub hub;
+ // Must only be read or written inside ConcurrentMap.compute() for the
+ // enclosing registry; ConcurrentHashMap.compute() serialises per-key
access.
+ int refCount;
+
+ CacheListenerHolder(EventListener listener, EventHub hub) {
+ this.listener = listener;
+ this.hub = hub;
+ this.refCount = 1;
+ }
+}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
index ed49082f2..dadfd7ec7 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
@@ -24,6 +24,8 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.CachedBackendStore.QueryId;
@@ -60,11 +62,20 @@ public final class CachedGraphTransaction extends
GraphTransaction {
private static final long AVG_VERTEX_ENTRY_SIZE = 40L;
private static final long AVG_EDGE_ENTRY_SIZE = 100L;
+ /*
+ * Listener lifetime must cover all active transactions for the graph.
+ * The holder is removed from the registry and unregistered from EventHub
+ * only when the last transaction releases it.
+ */
+ private static final ConcurrentMap<String, CacheListenerHolder>
+ GRAPH_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();
+
private final Cache<Id, Object> verticesCache;
private final Cache<Id, Object> edgesCache;
private EventListener storeEventListener;
private EventListener cacheEventListener;
+ private CacheListenerHolder holder;
public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
@@ -138,7 +149,7 @@ public final class CachedGraphTransaction extends
GraphTransaction {
}
// Listen cache event: "cache"(invalid cache item)
- this.cacheEventListener = event -> {
+ EventListener listener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
Object[] args = event.args();
@@ -184,18 +195,52 @@ public final class CachedGraphTransaction extends
GraphTransaction {
}
return false;
};
- if (graphCacheListenStatus.putIfAbsent(this.params().spaceGraphName(),
true) == null) {
- EventHub graphEventHub = this.params().graphEventHub();
- graphEventHub.listen(Events.CACHE, this.cacheEventListener);
- }
+ EventHub graphEventHub = this.params().graphEventHub();
+ String graphName = this.params().spaceGraphName();
+ CacheListenerHolder acquired = GRAPH_CACHE_EVENT_LISTENERS.compute(
+ graphName, (key, existing) -> {
+ if (existing == null || existing.hub != graphEventHub) {
+ // Graph close/reopen creates a new EventHub for the
+ // same graph name; replace the stale holder. Old
+ // transactions skip decrement via identity check.
+ if (existing != null) {
+ existing.hub.unlisten(Events.CACHE,
+ existing.listener);
+ }
+ graphEventHub.listen(Events.CACHE, listener);
+ return new CacheListenerHolder(listener,
graphEventHub);
+ }
+ existing.refCount++;
+ return existing;
+ });
+ this.holder = acquired;
+ this.cacheEventListener = acquired.listener;
}
private void unlistenChanges() {
String graphName = this.params().spaceGraphName();
- if (graphCacheListenStatus.remove(graphName) != null) {
- EventHub graphEventHub = this.params().graphEventHub();
- graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+ CacheListenerHolder ours = this.holder;
+ if (ours != null) {
+ GRAPH_CACHE_EVENT_LISTENERS.compute(graphName, (key, existing) -> {
+ if (existing == null || existing != ours) {
+ return existing;
+ }
+ existing.refCount--;
+ if (existing.refCount == 0) {
+ existing.hub.unlisten(Events.CACHE, existing.listener);
+ return null;
+ }
+ return existing;
+ });
+ this.holder = null;
+ this.cacheEventListener = null;
}
+ // TODO (follow-up): storeEventListenStatus has the same owner-first
+ // close bug this PR fixes for GRAPH_CACHE_EVENT_LISTENERS. A non-owner
+ // transaction can remove the tracking entry, unlisten its own
+ // never-registered storeEventListener as a no-op, and leave the
+ // original store listener registered but untracked. Apply the same
+ // ref-counted holder pattern in a follow-up PR.
if (storeEventListenStatus.remove(graphName) != null) {
this.store().provider().unlisten(this.storeEventListener);
}
@@ -203,12 +248,14 @@ public final class CachedGraphTransaction extends
GraphTransaction {
private void notifyChanges(String action, HugeType type, Id[] ids) {
EventHub graphEventHub = this.params().graphEventHub();
- graphEventHub.notify(Events.CACHE, action, type, ids);
+ graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ action, type, ids);
}
private void notifyChanges(String action, HugeType type) {
EventHub graphEventHub = this.params().graphEventHub();
- graphEventHub.notify(Events.CACHE, action, type);
+ graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ action, type);
}
public void clearCache(HugeType type, boolean notify) {
@@ -220,7 +267,7 @@ public final class CachedGraphTransaction extends
GraphTransaction {
}
if (notify) {
- this.notifyChanges(Cache.ACTION_CLEARED, null);
+ this.notifyChanges(Cache.ACTION_CLEAR, null);
}
}
@@ -397,7 +444,7 @@ public final class CachedGraphTransaction extends
GraphTransaction {
this.verticesCache.invalidate(vertex.id());
}
if (vertexOffset > 0) {
- this.notifyChanges(Cache.ACTION_INVALIDED,
+ this.notifyChanges(Cache.ACTION_INVALID,
HugeType.VERTEX, vertexIds);
}
}
@@ -411,7 +458,7 @@ public final class CachedGraphTransaction extends
GraphTransaction {
if (invalidEdgesCache && this.enableCacheEdge()) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
- this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
+ this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
}
}
}
@@ -425,7 +472,7 @@ public final class CachedGraphTransaction extends
GraphTransaction {
if (indexLabel.baseType() == HugeType.EDGE_LABEL) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
- this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
+ this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
}
}
}
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
index 20a355e87..d393be746 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeGraphParams;
@@ -42,6 +43,14 @@ import com.google.common.collect.ImmutableSet;
public final class CachedSchemaTransaction extends SchemaTransaction {
+ /*
+ * Listener lifetime must cover all active transactions for the graph.
+ * The holder is removed from the registry and unregistered from EventHub
+ * only when the last transaction releases it.
+ */
+ private static final ConcurrentMap<String, CacheListenerHolder>
+ SCHEMA_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();
+
private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;
@@ -49,6 +58,7 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
private EventListener storeEventListener;
private EventListener cacheEventListener;
+ private CacheListenerHolder holder;
public CachedSchemaTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
@@ -111,7 +121,7 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
this.store().provider().listen(this.storeEventListener);
// Listen cache event: "cache"(invalid cache item)
- this.cacheEventListener = event -> {
+ EventListener listener = event -> {
LOG.debug("Graph {} received schema cache event: {}",
this.graph(), event);
Object[] args = event.args();
@@ -132,9 +142,26 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
return false;
};
EventHub schemaEventHub = this.params().schemaEventHub();
- if (!schemaEventHub.containsListener(Events.CACHE)) {
- schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
- }
+ String graph = this.params().spaceGraphName();
+ CacheListenerHolder acquired = SCHEMA_CACHE_EVENT_LISTENERS.compute(
+ graph, (key, existing) -> {
+ if (existing == null || existing.hub != schemaEventHub) {
+ // Graph close/reopen creates a new EventHub for the
+ // same graph name; replace the stale holder. Old
+ // transactions skip decrement via identity check.
+ if (existing != null) {
+ existing.hub.unlisten(Events.CACHE,
+ existing.listener);
+ }
+ schemaEventHub.listen(Events.CACHE, listener);
+ return new CacheListenerHolder(listener,
+ schemaEventHub);
+ }
+ existing.refCount++;
+ return existing;
+ });
+ this.holder = acquired;
+ this.cacheEventListener = acquired.listener;
}
private void unlistenChanges() {
@@ -142,18 +169,36 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
this.store().provider().unlisten(this.storeEventListener);
// Unlisten cache event
- EventHub schemaEventHub = this.params().schemaEventHub();
- schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+ CacheListenerHolder ours = this.holder;
+ if (ours != null) {
+ SCHEMA_CACHE_EVENT_LISTENERS.compute(
+ this.params().spaceGraphName(), (key, existing) -> {
+ if (existing == null || existing != ours) {
+ return existing;
+ }
+ existing.refCount--;
+ if (existing.refCount == 0) {
+ existing.hub.unlisten(Events.CACHE,
+ existing.listener);
+ return null;
+ }
+ return existing;
+ });
+ this.holder = null;
+ this.cacheEventListener = null;
+ }
}
private void notifyChanges(String action, HugeType type, Id id) {
EventHub graphEventHub = this.params().schemaEventHub();
- graphEventHub.notify(Events.CACHE, action, type, id);
+ graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ action, type, id);
}
private void notifyChanges(String action, HugeType type) {
EventHub graphEventHub = this.params().schemaEventHub();
- graphEventHub.notify(Events.CACHE, action, type);
+ graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+ action, type);
}
private void resetCachedAll(HugeType type) {
@@ -179,7 +224,7 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
this.arrayCaches.clear();
if (notify) {
- this.notifyChanges(Cache.ACTION_CLEARED, null);
+ this.notifyChanges(Cache.ACTION_CLEAR, null);
}
}
@@ -221,7 +266,7 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
this.updateCache(schema);
- this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
+ this.notifyChanges(Cache.ACTION_INVALID, schema.type(), schema.id());
}
@Override
@@ -230,7 +275,7 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
this.updateCache(schema);
- this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
+ this.notifyChanges(Cache.ACTION_INVALID, schema.type(), schema.id());
}
@Override
@@ -283,7 +328,7 @@ public final class CachedSchemaTransaction extends
SchemaTransaction {
this.invalidateCache(schema.type(), schema.id());
- this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
+ this.notifyChanges(Cache.ACTION_INVALID, schema.type(), schema.id());
}
@Override
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
index 763ccaa0e..2910958db 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
@@ -140,8 +140,6 @@ public class GraphTransaction extends IndexableTransaction {
private final int verticesCapacity;
private final int edgesCapacity;
- protected static final ConcurrentHashMap<String, Boolean>
graphCacheListenStatus =
- new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<String, Boolean>
storeEventListenStatus =
new ConcurrentHashMap<>();
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
index a525bfcb9..8d7ef2166 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.unit.cache;
import java.lang.reflect.Proxy;
import java.util.Map;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hugegraph.backend.cache.Cache;
@@ -271,6 +272,8 @@ public class CacheManagerTest extends BaseUnitTest {
@Test
public void testCacheExpire() {
+ CacheManager manager = CacheManager.instance();
+
Cache<Id, Object> cache1 = new RamCache();
cache1.expire(26 * 1000L);
@@ -285,17 +288,23 @@ public class CacheManagerTest extends BaseUnitTest {
mockCache2);
Mockito.when(this.mockCaches.entrySet()).thenReturn(caches.entrySet());
- cache1.update(IdGenerator.of("fake-id"), "fake-value");
- cache2.update(IdGenerator.of("fake-id"), "fake-value");
+ TimerTask task = Whitebox.invoke(CacheManager.class, "scheduleTimer",
+ manager, 0.1F);
+ try {
+ cache1.update(IdGenerator.of("fake-id"), "fake-value",
+ -30 * 1000L);
+ cache2.update(IdGenerator.of("fake-id"), "fake-value");
- waitTillNext(40);
+ waitTillNext(3);
- // Would call tick() per 30s
- Mockito.verify(mockCache1, Mockito.times(1)).tick();
- Mockito.verify(mockCache2, Mockito.times(1)).tick();
+ Mockito.verify(mockCache1, Mockito.atLeastOnce()).tick();
+ Mockito.verify(mockCache2, Mockito.atLeastOnce()).tick();
- Assert.assertEquals(0, cache1.size());
- Assert.assertEquals(1, cache2.size());
+ Assert.assertEquals(0, cache1.size());
+ Assert.assertEquals(1, cache2.size());
+ } finally {
+ task.cancel();
+ }
}
@SuppressWarnings({"unused", "unchecked"})
@@ -308,4 +317,3 @@ public class CacheManagerTest extends BaseUnitTest {
return (Cache<Id, Object>) p;
}
}
-
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
index 4a1c18dbe..7bcc1a7fe 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
@@ -17,12 +17,21 @@
package org.apache.hugegraph.unit.cache;
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.cache.Cache;
import org.apache.hugegraph.backend.cache.CachedGraphTransaction;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeVertex;
@@ -42,19 +51,30 @@ public class CachedGraphTransactionTest extends
BaseUnitTest {
private CachedGraphTransaction cache;
private HugeGraphParams params;
+ private HugeGraph graph;
@Before
public void setup() {
- HugeGraph graph = HugeFactory.open(FakeObjects.newConfig());
- this.params = Whitebox.getInternalState(graph, "params");
+ this.graph = HugeFactory.open(FakeObjects.newConfig());
+ this.params = Whitebox.getInternalState(this.graph, "params");
this.cache = new CachedGraphTransaction(this.params,
this.params.loadGraphStore());
}
@After
public void teardown() throws Exception {
- this.cache().graph().clearBackend();
- this.cache().graph().close();
+ try {
+ if (this.cache != null) {
+ this.cache.close();
+ }
+ } finally {
+ this.cache = null;
+ if (this.graph != null) {
+ this.graph.clearBackend();
+ this.graph.close();
+ this.graph = null;
+ }
+ }
}
private CachedGraphTransaction cache() {
@@ -62,6 +82,42 @@ public class CachedGraphTransactionTest extends BaseUnitTest
{
return this.cache;
}
+ @SuppressWarnings("unchecked")
+ private static ConcurrentMap<String, Object> graphCacheEventListeners()
+ throws Exception {
+ Field field = CachedGraphTransaction.class
+ .getDeclaredField(
+
"GRAPH_CACHE_EVENT_LISTENERS");
+ field.setAccessible(true);
+ return (ConcurrentMap<String, Object>) field.get(null);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ConcurrentMap<String, Boolean> storeEventListenStatus()
+ throws Exception {
+ Field field = GraphTransaction.class
+
.getDeclaredField("storeEventListenStatus");
+ field.setAccessible(true);
+ return (ConcurrentMap<String, Boolean>) field.get(null);
+ }
+
+ private static void restoreStoreListenerStatusForKnownTeardownBug(
+ ConcurrentMap<String, Boolean> storeListeners, String graphName) {
+ // Closing a secondary transaction can consume storeEventListenStatus
due
+ // to the follow-up bug documented in
CachedGraphTransaction.unlistenChanges().
+ // Restore it so teardown can still unregister the primary store
listener.
+ storeListeners.putIfAbsent(graphName, true);
+ }
+
+ private static EventListener holderListener(Object holder) {
+ return Whitebox.getInternalState(holder, "listener");
+ }
+
+ private static int holderRefCount(Object holder) {
+ Integer refCount = Whitebox.getInternalState(holder, "refCount");
+ return refCount;
+ }
+
private HugeVertex newVertex(Id id) {
HugeGraph graph = this.cache().graph();
graph.schema().propertyKey("name").asText()
@@ -138,6 +194,174 @@ public class CachedGraphTransactionTest extends
BaseUnitTest {
Whitebox.invoke(cache, "verticesCache", "size"));
}
+ @Test
+ public void testClearCacheEmitsActionClear() throws Exception {
+ // Producers must emit the present-tense ACTION_CLEAR / ACTION_INVALID,
+ // not the legacy past-tense variants - otherwise local listeners that
+ // match only the present-tense actions silently drop the event.
+ CachedGraphTransaction cache = this.cache();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<String> action = new AtomicReference<>();
+ EventListener listener = event -> {
+ Object[] args = event.args();
+ if (args.length > 0 && args[0] instanceof String) {
+ action.set((String) args[0]);
+ latch.countDown();
+ }
+ return true;
+ };
+ this.params.graphEventHub().listen(Events.CACHE, listener);
+ try {
+ cache.clearCache(HugeType.VERTEX, true);
+
+ Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ Assert.assertEquals(Cache.ACTION_CLEAR, action.get());
+ } finally {
+ this.params.graphEventHub().unlisten(Events.CACHE, listener);
+ }
+ }
+
+ @Test
+ public void testVertexMutationEmitsActionInvalid() throws Exception {
+ CachedGraphTransaction cache = this.cache();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<String> action = new AtomicReference<>();
+ EventListener listener = event -> {
+ Object[] args = event.args();
+ if (args.length > 0 && Cache.ACTION_INVALID.equals(args[0])) {
+ action.set((String) args[0]);
+ latch.countDown();
+ }
+ return true;
+ };
+ this.params.graphEventHub().listen(Events.CACHE, listener);
+ try {
+ cache.addVertex(this.newVertex(IdGenerator.of(1)));
+ cache.commit();
+
+ Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ Assert.assertEquals(Cache.ACTION_INVALID, action.get());
+ } finally {
+ this.params.graphEventHub().unlisten(Events.CACHE, listener);
+ }
+ }
+
+ @Test
+ public void testClosingNonOwnerKeepsGraphCacheListenerRegistered()
+ throws Exception {
+ ConcurrentMap<String, Object> cacheListeners =
+ graphCacheEventListeners();
+ ConcurrentMap<String, Boolean> storeListeners =
+ storeEventListenStatus();
+
+ String graphName = this.params.spaceGraphName();
+ Object holder = cacheListeners.get(graphName);
+ Assert.assertNotNull(holder);
+ EventListener registered = holderListener(holder);
+ int refCount = holderRefCount(holder);
+
+ CachedGraphTransaction second = new CachedGraphTransaction(
+ this.params, this.params.loadGraphStore());
+ Assert.assertSame(holder, cacheListeners.get(graphName));
+ Assert.assertEquals(refCount + 1, holderRefCount(holder));
+
+ try {
+ second.close();
+
+ Assert.assertSame(holder, cacheListeners.get(graphName));
+ Assert.assertEquals(refCount, holderRefCount(holder));
+ Assert.assertTrue(this.params.graphEventHub()
+ .listeners(Events.CACHE)
+ .contains(registered));
+ } finally {
+ restoreStoreListenerStatusForKnownTeardownBug(storeListeners,
+ graphName);
+ }
+ }
+
+ @Test
+ public void testCacheListenerSurvivesOwnerClose() throws Exception {
+ ConcurrentMap<String, Object> cacheListeners =
+ graphCacheEventListeners();
+ String graphName = this.params.spaceGraphName();
+ CachedGraphTransaction owner = this.cache();
+ CachedGraphTransaction second = new CachedGraphTransaction(
+ this.params, this.params.loadGraphStore());
+
+ Object holder = cacheListeners.get(graphName);
+ Assert.assertNotNull(holder);
+ EventListener registered = holderListener(holder);
+ int refCount = holderRefCount(holder);
+ Assert.assertTrue(refCount >= 2);
+
+ owner.close();
+ this.cache = second;
+
+ Assert.assertSame(holder, cacheListeners.get(graphName));
+ Assert.assertEquals(refCount - 1, holderRefCount(holder));
+ Assert.assertTrue(this.params.graphEventHub()
+ .listeners(Events.CACHE)
+ .contains(registered));
+
+ second.addVertex(this.newVertex(IdGenerator.of(1)));
+ second.addVertex(this.newVertex(IdGenerator.of(2)));
+ second.commit();
+ Assert.assertTrue(second.queryVertices(IdGenerator.of(1)).hasNext());
+ Assert.assertTrue(second.queryVertices(IdGenerator.of(2)).hasNext());
+ Assert.assertEquals(2L,
+ Whitebox.invoke(second, "verticesCache", "size"));
+
+ this.params.graphEventHub().notify(Events.CACHE, Cache.ACTION_INVALID,
+ HugeType.VERTEX, IdGenerator.of(1))
+ .get();
+
+ Assert.assertEquals(1L,
+ Whitebox.invoke(second, "verticesCache", "size"));
+ }
+
+ @Test
+ public void testLastCloseRemovesGraphCacheListener() throws Exception {
+ ConcurrentMap<String, Object> cacheListeners =
+ graphCacheEventListeners();
+ String graphName = this.params.spaceGraphName();
+ CachedGraphTransaction owner = this.cache();
+ CachedGraphTransaction second = new CachedGraphTransaction(
+ this.params, this.params.loadGraphStore());
+
+ Object holder = cacheListeners.get(graphName);
+ Assert.assertNotNull(holder);
+ EventListener registered = holderListener(holder);
+ Assert.assertTrue(holderRefCount(holder) >= 2);
+
+ owner.close();
+ second.close();
+ this.cache = null;
+ this.params.graphTransaction().close();
+
+ Assert.assertFalse(cacheListeners.containsKey(graphName));
+ Assert.assertFalse(this.params.graphEventHub()
+ .listeners(Events.CACHE)
+ .contains(registered));
+
+ this.graph.clearBackend();
+ this.graph.close();
+ this.graph = null;
+
+ HugeGraph reopened = HugeFactory.open(FakeObjects.newConfig());
+ this.graph = reopened;
+ this.params = Whitebox.getInternalState(reopened, "params");
+ Object reopenedHolder = cacheListeners.get(graphName);
+ Assert.assertNotNull(reopenedHolder);
+ Assert.assertNotSame(holder, reopenedHolder);
+ int reopenedRefCount = holderRefCount(reopenedHolder);
+ CachedGraphTransaction third = new CachedGraphTransaction(
+ this.params, this.params.loadGraphStore());
+ this.cache = third;
+ Object newHolder = cacheListeners.get(graphName);
+ Assert.assertSame(reopenedHolder, newHolder);
+ Assert.assertEquals(reopenedRefCount + 1, holderRefCount(newHolder));
+ }
+
@Test
public void testEdgeCacheClearWhenDeleteVertex() {
CachedGraphTransaction cache = this.cache();
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
index efb63fad2..9921670ee 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
@@ -23,21 +23,26 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.Cache;
+import org.apache.hugegraph.backend.cache.CacheNotifier;
import org.apache.hugegraph.backend.cache.CacheManager;
import org.apache.hugegraph.backend.cache.CachedSchemaTransaction;
import org.apache.hugegraph.backend.cache.CachedSchemaTransactionV2;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.event.EventHub;
+import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.meta.MetaDriver;
import org.apache.hugegraph.meta.MetaManager;
import org.apache.hugegraph.meta.managers.GraphMetaManager;
@@ -60,19 +65,30 @@ public class CachedSchemaTransactionTest extends
BaseUnitTest {
private CachedSchemaTransaction cache;
private HugeGraphParams params;
+ private HugeGraph graph;
@Before
public void setup() {
- HugeGraph graph = HugeFactory.open(FakeObjects.newConfig());
- this.params = Whitebox.getInternalState(graph, "params");
+ this.graph = HugeFactory.open(FakeObjects.newConfig());
+ this.params = Whitebox.getInternalState(this.graph, "params");
this.cache = new CachedSchemaTransaction(this.params,
this.params.loadSchemaStore());
}
@After
public void teardown() throws Exception {
- this.cache().graph().clearBackend();
- this.cache().graph().close();
+ try {
+ if (this.cache != null) {
+ this.cache.close();
+ }
+ } finally {
+ this.cache = null;
+ if (this.graph != null) {
+ this.graph.clearBackend();
+ this.graph.close();
+ this.graph = null;
+ }
+ }
}
private CachedSchemaTransaction cache() {
@@ -80,6 +96,25 @@ public class CachedSchemaTransactionTest extends
BaseUnitTest {
return this.cache;
}
+ @SuppressWarnings("unchecked")
+ private static ConcurrentMap<String, Object> schemaCacheEventListeners()
+ throws Exception {
+ Field field = CachedSchemaTransaction.class
+ .getDeclaredField(
+
"SCHEMA_CACHE_EVENT_LISTENERS");
+ field.setAccessible(true);
+ return (ConcurrentMap<String, Object>) field.get(null);
+ }
+
+ private static EventListener holderListener(Object holder) {
+ return Whitebox.getInternalState(holder, "listener");
+ }
+
+ private static int holderRefCount(Object holder) {
+ Integer refCount = Whitebox.getInternalState(holder, "refCount");
+ return refCount;
+ }
+
@Test
public void testEventClear() throws Exception {
CachedSchemaTransaction cache = this.cache();
@@ -187,6 +222,176 @@ public class CachedSchemaTransactionTest extends
BaseUnitTest {
cache.getPropertyKey(IdGenerator.of(1)).name());
}
+ @Test
+ public void testLegacySchemaChangeEmitsActionInvalid()
+ throws Exception {
+ CachedSchemaTransaction cache = this.cache();
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<String> action = new AtomicReference<>();
+ EventListener listener = event -> {
+ Object[] args = event.args();
+ if (args.length > 0) {
+ action.set((String) args[0]);
+ latch.countDown();
+ }
+ return true;
+ };
+ this.params.schemaEventHub().listen(Events.CACHE, listener);
+
+ try {
+ FakeObjects objects = new FakeObjects("unit-test");
+ cache.addPropertyKey(objects.newPropertyKey(IdGenerator.of(1),
+ "fake-pk-1"));
+
+ Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ Assert.assertEquals(Cache.ACTION_INVALID, action.get());
+ Assert.assertEquals(1L, Whitebox.invoke(cache, "idCache", "size"));
+ Assert.assertEquals(1L,
+ Whitebox.invoke(cache, "nameCache", "size"));
+ Assert.assertEquals("fake-pk-1",
+ cache.getPropertyKey(IdGenerator.of(1))
+ .name());
+ Assert.assertEquals(IdGenerator.of(1),
+ cache.getPropertyKey("fake-pk-1").id());
+ } finally {
+ this.params.schemaEventHub().unlisten(Events.CACHE, listener);
+ }
+ }
+
+ @Test
+ public void testCacheListenerSurvivesOwnerClose() throws Exception {
+ ConcurrentMap<String, Object> registry = schemaCacheEventListeners();
+ String graphName = this.params.spaceGraphName();
+ CachedSchemaTransaction owner = this.cache();
+ CachedSchemaTransaction second = new CachedSchemaTransaction(
+ this.params, this.params.loadSchemaStore());
+
+ Object holder = registry.get(graphName);
+ Assert.assertNotNull(holder);
+ EventListener registered = holderListener(holder);
+ int refCount = holderRefCount(holder);
+ Assert.assertTrue(refCount >= 2);
+
+ owner.close();
+ this.cache = second;
+
+ Assert.assertSame(holder, registry.get(graphName));
+ Assert.assertEquals(refCount - 1, holderRefCount(holder));
+ Assert.assertTrue(this.params.schemaEventHub()
+ .listeners(Events.CACHE)
+ .contains(registered));
+
+ FakeObjects objects = new FakeObjects("unit-test");
+ second.addPropertyKey(objects.newPropertyKey(IdGenerator.of(1),
+ "fake-pk-1"));
+ second.addPropertyKey(objects.newPropertyKey(IdGenerator.of(2),
+ "fake-pk-2"));
+ Assert.assertEquals(2L, Whitebox.invoke(second, "idCache", "size"));
+ Assert.assertEquals(2L, Whitebox.invoke(second, "nameCache", "size"));
+
+ this.params.schemaEventHub().notify(Events.CACHE, Cache.ACTION_CLEAR,
+ null).get();
+
+ Assert.assertEquals(0L, Whitebox.invoke(second, "idCache", "size"));
+ Assert.assertEquals(0L, Whitebox.invoke(second, "nameCache", "size"));
+ }
+
+ @Test
+ public void testLastCloseRemovesSchemaCacheListener() throws Exception {
+ ConcurrentMap<String, Object> registry = schemaCacheEventListeners();
+ String graphName = this.params.spaceGraphName();
+ CachedSchemaTransaction owner = this.cache();
+ CachedSchemaTransaction second = new CachedSchemaTransaction(
+ this.params, this.params.loadSchemaStore());
+
+ Object holder = registry.get(graphName);
+ Assert.assertNotNull(holder);
+ EventListener registered = holderListener(holder);
+ Assert.assertTrue(holderRefCount(holder) >= 2);
+
+ owner.close();
+ second.close();
+ this.cache = null;
+ this.params.schemaTransaction().close();
+
+ Assert.assertFalse(registry.containsKey(graphName));
+ Assert.assertFalse(this.params.schemaEventHub()
+ .listeners(Events.CACHE)
+ .contains(registered));
+
+ this.graph.clearBackend();
+ this.graph.close();
+ this.graph = null;
+
+ HugeGraph reopened = HugeFactory.open(FakeObjects.newConfig());
+ this.graph = reopened;
+ this.params = Whitebox.getInternalState(reopened, "params");
+ Object reopenedHolder = registry.get(graphName);
+ Assert.assertNotNull(reopenedHolder);
+ Assert.assertNotSame(holder, reopenedHolder);
+ int reopenedRefCount = holderRefCount(reopenedHolder);
+ CachedSchemaTransaction third = new CachedSchemaTransaction(
+ this.params, this.params.loadSchemaStore());
+ this.cache = third;
+ Object newHolder = registry.get(graphName);
+ Assert.assertSame(reopenedHolder, newHolder);
+ Assert.assertEquals(reopenedRefCount + 1, holderRefCount(newHolder));
+ }
+
+ @Test
+ public void testCacheNotifierLocalEventCallsProxyOnce()
+ throws Exception {
+ EventHub hub = new EventHub("schema-cache-notifier-test");
+ AtomicInteger proxyInvalidCalls = new AtomicInteger();
+ CacheNotifier proxy = newCacheNotifier(proxyInvalidCalls,
+ new AtomicInteger(),
+ new AtomicInteger());
+ CacheNotifier notifier = newSchemaCacheNotifier(hub, proxy);
+
+ try {
+ Assert.assertEquals(1, (int) hub.notify(Events.CACHE,
+ Cache.ACTION_INVALID,
+ HugeType.PROPERTY_KEY,
+ IdGenerator.of(1))
+ .get());
+ Assert.assertEquals(1, proxyInvalidCalls.get());
+ } finally {
+ notifier.close();
+ }
+ }
+
+ @Test
+ public void testCacheNotifierRpcInvalidDoesNotLoopToProxy()
+ throws Exception {
+ EventHub hub = new EventHub("schema-cache-notifier-test");
+ AtomicInteger proxyInvalidCalls = new AtomicInteger();
+ AtomicInteger localInvalidCalls = new AtomicInteger();
+ CountDownLatch latch = new CountDownLatch(1);
+ CacheNotifier proxy = newCacheNotifier(proxyInvalidCalls,
+ new AtomicInteger(),
+ new AtomicInteger());
+ CacheNotifier notifier = newSchemaCacheNotifier(hub, proxy);
+ EventListener localListener = event -> {
+ event.checkArgs(String.class, HugeType.class, Id.class);
+ if (Cache.ACTION_INVALID.equals(event.args()[0])) {
+ localInvalidCalls.incrementAndGet();
+ latch.countDown();
+ }
+ return true;
+ };
+ hub.listen(Events.CACHE, localListener);
+
+ try {
+ notifier.invalid(HugeType.PROPERTY_KEY, IdGenerator.of(1));
+
+ Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+ Assert.assertEquals(1, localInvalidCalls.get());
+ Assert.assertEquals(0, proxyInvalidCalls.get());
+ } finally {
+ notifier.close();
+ }
+ }
+
@Test
public void testClearV2SchemaCacheByGraphName() {
String graphName = "DEFAULT-unit-test-v2";
@@ -684,6 +889,50 @@ public class CachedSchemaTransactionTest extends
BaseUnitTest {
return previous;
}
+ private static CacheNotifier newSchemaCacheNotifier(EventHub hub,
+ CacheNotifier proxy)
+ throws Exception {
+ Class<?> clazz = Class.forName("org.apache.hugegraph." +
+ "StandardHugeGraph$" +
+ "HugeSchemaCacheNotifier");
+ Constructor<?> constructor = clazz.getDeclaredConstructor(
+ EventHub.class, CacheNotifier.class);
+ constructor.setAccessible(true);
+ return (CacheNotifier) constructor.newInstance(hub, proxy);
+ }
+
+ private static CacheNotifier newCacheNotifier(AtomicInteger invalidCalls,
+ AtomicInteger invalid2Calls,
+ AtomicInteger clearCalls) {
+ return new CacheNotifier() {
+
+ @Override
+ public void invalid(HugeType type, Id id) {
+ invalidCalls.incrementAndGet();
+ }
+
+ @Override
+ public void invalid2(HugeType type, Object[] ids) {
+ invalid2Calls.incrementAndGet();
+ }
+
+ @Override
+ public void clear(HugeType type) {
+ clearCalls.incrementAndGet();
+ }
+
+ @Override
+ public void reload() {
+ // pass
+ }
+
+ @Override
+ public void close() {
+ // pass
+ }
+ };
+ }
+
@SuppressWarnings("unchecked")
private static <T> T readField(Object target, String name)
throws ReflectiveOperationException {