This is an automated email from the ASF dual-hosted git repository.

imbajin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new f69ca66cd fix(server): align cache event actions in legacy EventHub 
path (#3017)
f69ca66cd is described below

commit f69ca66cd6c7e1e466cd6f3f28fc843b0ea64409
Author: Davide Polato <[email protected]>
AuthorDate: Fri May 15 05:25:33 2026 +0200

    fix(server): align cache event actions in legacy EventHub path (#3017)
    
    - Align legacy cache invalidation producers and listeners on ACTION_INVALID 
and
      ACTION_CLEAR, removing the obsolete ACTION_INVALIDED/ACTION_CLEARED 
constants.
    - Add EventHub.notifyExcept(...) so cache transactions and the cache 
notifier
      bridge can avoid re-processing their own local listener while still 
delivering
      events to other listeners.
    - Track registered graph/schema cache listeners per graph so 
notifyExcept(...)
      uses the listener instance actually registered on the EventHub, including
      multi-transaction cases where later transactions reuse the first listener.
    - Update cache notifier forwarding to prevent local RPC bridge loops after 
action
      names are unified.
    - Add regression coverage for notifyExcept semantics, graph/schema action 
names,
      listener teardown/re-registration, and notifier no-loop behavior.
    
    - The holder keeps the EventHub listener registered while any transaction 
for the
      graph is alive, and unregisters/removes it only when the last transaction
      releases it. The registry update, ref-count decrement, and hub unlisten 
now run
      inside ConcurrentMap.compute() to avoid owner-closes-first invalidation 
gaps.
    
      Also add graph/schema regression coverage for owner-first close and 
last-close
      cleanup, including graph close/reopen handling for stale EventHub holders.
---
 .../java/org/apache/hugegraph/event/EventHub.java  |  26 ++-
 .../apache/hugegraph/unit/event/EventHubTest.java  |  37 +++
 .../org/apache/hugegraph/StandardHugeGraph.java    |  13 +-
 .../org/apache/hugegraph/backend/cache/Cache.java  |   2 -
 .../backend/cache/CacheListenerHolder.java         |  41 ++++
 .../backend/cache/CachedGraphTransaction.java      |  75 ++++--
 .../backend/cache/CachedSchemaTransaction.java     |  69 +++++-
 .../hugegraph/backend/tx/GraphTransaction.java     |   2 -
 .../hugegraph/unit/cache/CacheManagerTest.java     |  26 ++-
 .../unit/cache/CachedGraphTransactionTest.java     | 232 ++++++++++++++++++-
 .../unit/cache/CachedSchemaTransactionTest.java    | 257 ++++++++++++++++++++-
 11 files changed, 727 insertions(+), 53 deletions(-)

diff --git 
a/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
 
b/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
index b37c67133..fbdf460a9 100644
--- 
a/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
+++ 
b/hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java
@@ -149,7 +149,27 @@ public class EventHub {
         return count;
     }
 
+    /**
+     * Notify all registered listeners for {@code event} EXCEPT
+     * {@code ignoredListener}. ANY_EVENT listeners are notified unless they
+     * are the ignored one.
+     *
+     * @return a Future<Integer> resolving to the count of listeners actually
+     *         invoked (the ignored listener is NOT counted)
+     */
+    public Future<Integer> notifyExcept(String event,
+                                        EventListener ignoredListener,
+                                        @Nullable Object... args) {
+        return this.notify(event, ignoredListener, args);
+    }
+
     public Future<Integer> notify(String event, @Nullable Object... args) {
+        return this.notify(event, null, args);
+    }
+
+    private Future<Integer> notify(String event,
+                                   EventListener ignoredListener,
+                                   @Nullable Object... args) {
         @SuppressWarnings("resource")
         ExtendableIterator<EventListener> all = new ExtendableIterator<>();
 
@@ -173,8 +193,12 @@ public class EventHub {
             int count = 0;
             // Notify all listeners, and ignore the results
             while (all.hasNext()) {
+                EventListener listener = all.next();
+                if (listener == ignoredListener) {
+                    continue;
+                }
                 try {
-                    all.next().event(ev);
+                    listener.event(ev);
                     count++;
                 } catch (Throwable e) {
                     LOG.warn("Failed to handle event: {}", ev, e);
diff --git 
a/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
 
b/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
index dff022702..69472bc8e 100644
--- 
a/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
+++ 
b/hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java
@@ -388,6 +388,43 @@ public class EventHubTest extends BaseUnitTest {
         Assert.assertEquals(1, count.get());
     }
 
+    @Test
+    public void testNotifyExcept() throws Exception {
+        final String notify = "event-notify";
+        AtomicInteger listenerACount = new AtomicInteger();
+        AtomicInteger listenerBCount = new AtomicInteger();
+        AtomicInteger listenerCCount = new AtomicInteger();
+
+        EventListener listenerA = event -> {
+            event.checkArgs(String.class);
+            Assert.assertEquals("fake-arg", event.args()[0]);
+            listenerACount.incrementAndGet();
+            return true;
+        };
+        EventListener listenerB = event -> {
+            listenerBCount.incrementAndGet();
+            return true;
+        };
+        EventListener listenerC = event -> {
+            event.checkArgs(String.class);
+            Assert.assertEquals("fake-arg", event.args()[0]);
+            listenerCCount.incrementAndGet();
+            return true;
+        };
+
+        this.eventHub.listen(notify, listenerA);
+        this.eventHub.listen(notify, listenerB);
+        this.eventHub.listen(EventHub.ANY_EVENT, listenerC);
+
+        Assert.assertEquals(2, (int) this.eventHub
+                                          .notifyExcept(notify, listenerB,
+                                                        "fake-arg")
+                                          .get());
+        Assert.assertEquals(1, listenerACount.get());
+        Assert.assertEquals(0, listenerBCount.get());
+        Assert.assertEquals(1, listenerCCount.get());
+    }
+
     @Test
     public void testEventNotifyWithMultiThreads() throws InterruptedException {
         final String notify = "event-notify";
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index faf97aa8d..f8f24ab62 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -1394,7 +1394,7 @@ public class StandardHugeGraph implements HugeGraph {
                                     "Expect event action argument");
                     String action = (String) args[0];
                     LOG.debug("Event action: {}", action);
-                    if (Cache.ACTION_INVALIDED.equals(action)) {
+                    if (Cache.ACTION_INVALID.equals(action)) {
                         event.checkArgs(String.class, HugeType.class, 
Object.class);
                         HugeType type = (HugeType) args[1];
                         Object ids = args[2];
@@ -1410,7 +1410,7 @@ public class StandardHugeGraph implements HugeGraph {
                             E.checkArgument(false, "Unexpected argument: %s", 
ids);
                         }
                         return true;
-                    } else if (Cache.ACTION_CLEARED.equals(action)) {
+                    } else if (Cache.ACTION_CLEAR.equals(action)) {
                         event.checkArgs(String.class, HugeType.class);
                         HugeType type = (HugeType) args[1];
                         LOG.debug("Calling proxy.clear with type: {}", type);
@@ -1435,17 +1435,20 @@ public class StandardHugeGraph implements HugeGraph {
 
         @Override
         public void invalid(HugeType type, Id id) {
-            this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id);
+            this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                  Cache.ACTION_INVALID, type, id);
         }
 
         @Override
         public void invalid2(HugeType type, Object[] ids) {
-            this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids);
+            this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                  Cache.ACTION_INVALID, type, ids);
         }
 
         @Override
         public void clear(HugeType type) {
-            this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
+            this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                  Cache.ACTION_CLEAR, type);
         }
 
         @Override
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
index f58a9ce45..73910fd08 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java
@@ -24,8 +24,6 @@ public interface Cache<K, V> {
 
     String ACTION_INVALID = "invalid";
     String ACTION_CLEAR = "clear";
-    String ACTION_INVALIDED = "invalided";
-    String ACTION_CLEARED = "cleared";
 
     V get(K id);
 
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java
new file mode 100644
index 000000000..b4133679b
--- /dev/null
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CacheListenerHolder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.backend.cache;
+
+import org.apache.hugegraph.event.EventHub;
+import org.apache.hugegraph.event.EventListener;
+
+/*
+ * Listener lifetime must cover all active transactions for the graph.
+ * The holder is removed from the registry and unregistered from EventHub
+ * only when the last transaction releases it.
+ */
+final class CacheListenerHolder {
+
+    final EventListener listener;
+    final EventHub hub;
+    // Must only be read or written inside ConcurrentMap.compute() for the
+    // enclosing registry; ConcurrentHashMap.compute() serialises per-key 
access.
+    int refCount;
+
+    CacheListenerHolder(EventListener listener, EventHub hub) {
+        this.listener = listener;
+        this.hub = hub;
+        this.refCount = 1;
+    }
+}
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
index ed49082f2..dadfd7ec7 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java
@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hugegraph.HugeGraphParams;
 import org.apache.hugegraph.backend.cache.CachedBackendStore.QueryId;
@@ -60,11 +62,20 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
     private static final long AVG_VERTEX_ENTRY_SIZE = 40L;
     private static final long AVG_EDGE_ENTRY_SIZE = 100L;
 
+    /*
+     * Listener lifetime must cover all active transactions for the graph.
+     * The holder is removed from the registry and unregistered from EventHub
+     * only when the last transaction releases it.
+     */
+    private static final ConcurrentMap<String, CacheListenerHolder>
+            GRAPH_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();
+
     private final Cache<Id, Object> verticesCache;
     private final Cache<Id, Object> edgesCache;
 
     private EventListener storeEventListener;
     private EventListener cacheEventListener;
+    private CacheListenerHolder holder;
 
     public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) {
         super(graph, store);
@@ -138,7 +149,7 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
         }
 
         // Listen cache event: "cache"(invalid cache item)
-        this.cacheEventListener = event -> {
+        EventListener listener = event -> {
             LOG.debug("Graph {} received graph cache event: {}",
                       this.graph(), event);
             Object[] args = event.args();
@@ -184,18 +195,52 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
             }
             return false;
         };
-        if (graphCacheListenStatus.putIfAbsent(this.params().spaceGraphName(), 
true) == null) {
-            EventHub graphEventHub = this.params().graphEventHub();
-            graphEventHub.listen(Events.CACHE, this.cacheEventListener);
-        }
+        EventHub graphEventHub = this.params().graphEventHub();
+        String graphName = this.params().spaceGraphName();
+        CacheListenerHolder acquired = GRAPH_CACHE_EVENT_LISTENERS.compute(
+                graphName, (key, existing) -> {
+                    if (existing == null || existing.hub != graphEventHub) {
+                        // Graph close/reopen creates a new EventHub for the
+                        // same graph name; replace the stale holder. Old
+                        // transactions skip decrement via identity check.
+                        if (existing != null) {
+                            existing.hub.unlisten(Events.CACHE,
+                                                  existing.listener);
+                        }
+                        graphEventHub.listen(Events.CACHE, listener);
+                        return new CacheListenerHolder(listener, 
graphEventHub);
+                    }
+                    existing.refCount++;
+                    return existing;
+                });
+        this.holder = acquired;
+        this.cacheEventListener = acquired.listener;
     }
 
     private void unlistenChanges() {
         String graphName = this.params().spaceGraphName();
-        if (graphCacheListenStatus.remove(graphName) != null) {
-            EventHub graphEventHub = this.params().graphEventHub();
-            graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+        CacheListenerHolder ours = this.holder;
+        if (ours != null) {
+            GRAPH_CACHE_EVENT_LISTENERS.compute(graphName, (key, existing) -> {
+                if (existing == null || existing != ours) {
+                    return existing;
+                }
+                existing.refCount--;
+                if (existing.refCount == 0) {
+                    existing.hub.unlisten(Events.CACHE, existing.listener);
+                    return null;
+                }
+                return existing;
+            });
+            this.holder = null;
+            this.cacheEventListener = null;
         }
+        // TODO (follow-up): storeEventListenStatus has the same owner-first
+        // close bug this PR fixes for GRAPH_CACHE_EVENT_LISTENERS. A non-owner
+        // transaction can remove the tracking entry, unlisten its own
+        // never-registered storeEventListener as a no-op, and leave the
+        // original store listener registered but untracked. Apply the same
+        // ref-counted holder pattern in a follow-up PR.
         if (storeEventListenStatus.remove(graphName) != null) {
             this.store().provider().unlisten(this.storeEventListener);
         }
@@ -203,12 +248,14 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
 
     private void notifyChanges(String action, HugeType type, Id[] ids) {
         EventHub graphEventHub = this.params().graphEventHub();
-        graphEventHub.notify(Events.CACHE, action, type, ids);
+        graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                   action, type, ids);
     }
 
     private void notifyChanges(String action, HugeType type) {
         EventHub graphEventHub = this.params().graphEventHub();
-        graphEventHub.notify(Events.CACHE, action, type);
+        graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                   action, type);
     }
 
     public void clearCache(HugeType type, boolean notify) {
@@ -220,7 +267,7 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
         }
 
         if (notify) {
-            this.notifyChanges(Cache.ACTION_CLEARED, null);
+            this.notifyChanges(Cache.ACTION_CLEAR, null);
         }
     }
 
@@ -397,7 +444,7 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
                     this.verticesCache.invalidate(vertex.id());
                 }
                 if (vertexOffset > 0) {
-                    this.notifyChanges(Cache.ACTION_INVALIDED,
+                    this.notifyChanges(Cache.ACTION_INVALID,
                                        HugeType.VERTEX, vertexIds);
                 }
             }
@@ -411,7 +458,7 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
             if (invalidEdgesCache && this.enableCacheEdge()) {
                 // TODO: Use a more precise strategy to update the edge cache
                 this.edgesCache.clear();
-                this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
+                this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
             }
         }
     }
@@ -425,7 +472,7 @@ public final class CachedGraphTransaction extends 
GraphTransaction {
             if (indexLabel.baseType() == HugeType.EDGE_LABEL) {
                 // TODO: Use a more precise strategy to update the edge cache
                 this.edgesCache.clear();
-                this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
+                this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
             }
         }
     }
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
index 20a355e87..d393be746 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 
 import org.apache.hugegraph.HugeGraphParams;
@@ -42,6 +43,14 @@ import com.google.common.collect.ImmutableSet;
 
 public final class CachedSchemaTransaction extends SchemaTransaction {
 
+    /*
+     * Listener lifetime must cover all active transactions for the graph.
+     * The holder is removed from the registry and unregistered from EventHub
+     * only when the last transaction releases it.
+     */
+    private static final ConcurrentMap<String, CacheListenerHolder>
+            SCHEMA_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();
+
     private final Cache<Id, Object> idCache;
     private final Cache<Id, Object> nameCache;
 
@@ -49,6 +58,7 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
 
     private EventListener storeEventListener;
     private EventListener cacheEventListener;
+    private CacheListenerHolder holder;
 
     public CachedSchemaTransaction(HugeGraphParams graph, BackendStore store) {
         super(graph, store);
@@ -111,7 +121,7 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
         this.store().provider().listen(this.storeEventListener);
 
         // Listen cache event: "cache"(invalid cache item)
-        this.cacheEventListener = event -> {
+        EventListener listener = event -> {
             LOG.debug("Graph {} received schema cache event: {}",
                       this.graph(), event);
             Object[] args = event.args();
@@ -132,9 +142,26 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
             return false;
         };
         EventHub schemaEventHub = this.params().schemaEventHub();
-        if (!schemaEventHub.containsListener(Events.CACHE)) {
-            schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
-        }
+        String graph = this.params().spaceGraphName();
+        CacheListenerHolder acquired = SCHEMA_CACHE_EVENT_LISTENERS.compute(
+                graph, (key, existing) -> {
+                    if (existing == null || existing.hub != schemaEventHub) {
+                        // Graph close/reopen creates a new EventHub for the
+                        // same graph name; replace the stale holder. Old
+                        // transactions skip decrement via identity check.
+                        if (existing != null) {
+                            existing.hub.unlisten(Events.CACHE,
+                                                  existing.listener);
+                        }
+                        schemaEventHub.listen(Events.CACHE, listener);
+                        return new CacheListenerHolder(listener,
+                                                       schemaEventHub);
+                    }
+                    existing.refCount++;
+                    return existing;
+                });
+        this.holder = acquired;
+        this.cacheEventListener = acquired.listener;
     }
 
     private void unlistenChanges() {
@@ -142,18 +169,36 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
         this.store().provider().unlisten(this.storeEventListener);
 
         // Unlisten cache event
-        EventHub schemaEventHub = this.params().schemaEventHub();
-        schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
+        CacheListenerHolder ours = this.holder;
+        if (ours != null) {
+            SCHEMA_CACHE_EVENT_LISTENERS.compute(
+                    this.params().spaceGraphName(), (key, existing) -> {
+                        if (existing == null || existing != ours) {
+                            return existing;
+                        }
+                        existing.refCount--;
+                        if (existing.refCount == 0) {
+                            existing.hub.unlisten(Events.CACHE,
+                                                  existing.listener);
+                            return null;
+                        }
+                        return existing;
+                    });
+            this.holder = null;
+            this.cacheEventListener = null;
+        }
     }
 
     private void notifyChanges(String action, HugeType type, Id id) {
         EventHub graphEventHub = this.params().schemaEventHub();
-        graphEventHub.notify(Events.CACHE, action, type, id);
+        graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                   action, type, id);
     }
 
     private void notifyChanges(String action, HugeType type) {
         EventHub graphEventHub = this.params().schemaEventHub();
-        graphEventHub.notify(Events.CACHE, action, type);
+        graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
+                                   action, type);
     }
 
     private void resetCachedAll(HugeType type) {
@@ -179,7 +224,7 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
         this.arrayCaches.clear();
 
         if (notify) {
-            this.notifyChanges(Cache.ACTION_CLEARED, null);
+            this.notifyChanges(Cache.ACTION_CLEAR, null);
         }
     }
 
@@ -221,7 +266,7 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
 
         this.updateCache(schema);
 
-        this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
+        this.notifyChanges(Cache.ACTION_INVALID, schema.type(), schema.id());
     }
 
     @Override
@@ -230,7 +275,7 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
 
         this.updateCache(schema);
 
-        this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
+        this.notifyChanges(Cache.ACTION_INVALID, schema.type(), schema.id());
     }
 
     @Override
@@ -283,7 +328,7 @@ public final class CachedSchemaTransaction extends 
SchemaTransaction {
 
         this.invalidateCache(schema.type(), schema.id());
 
-        this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
+        this.notifyChanges(Cache.ACTION_INVALID, schema.type(), schema.id());
     }
 
     @Override
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
index 763ccaa0e..2910958db 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java
@@ -140,8 +140,6 @@ public class GraphTransaction extends IndexableTransaction {
 
     private final int verticesCapacity;
     private final int edgesCapacity;
-    protected static final ConcurrentHashMap<String, Boolean> 
graphCacheListenStatus =
-            new ConcurrentHashMap<>();
     protected static final ConcurrentHashMap<String, Boolean> 
storeEventListenStatus =
             new ConcurrentHashMap<>();
 
diff --git 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
index a525bfcb9..8d7ef2166 100644
--- 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
+++ 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CacheManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.unit.cache;
 
 import java.lang.reflect.Proxy;
 import java.util.Map;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hugegraph.backend.cache.Cache;
@@ -271,6 +272,8 @@ public class CacheManagerTest extends BaseUnitTest {
 
     @Test
     public void testCacheExpire() {
+        CacheManager manager = CacheManager.instance();
+
         Cache<Id, Object> cache1 = new RamCache();
         cache1.expire(26 * 1000L);
 
@@ -285,17 +288,23 @@ public class CacheManagerTest extends BaseUnitTest {
                                                                 mockCache2);
         Mockito.when(this.mockCaches.entrySet()).thenReturn(caches.entrySet());
 
-        cache1.update(IdGenerator.of("fake-id"), "fake-value");
-        cache2.update(IdGenerator.of("fake-id"), "fake-value");
+        TimerTask task = Whitebox.invoke(CacheManager.class, "scheduleTimer",
+                                         manager, 0.1F);
+        try {
+            cache1.update(IdGenerator.of("fake-id"), "fake-value",
+                          -30 * 1000L);
+            cache2.update(IdGenerator.of("fake-id"), "fake-value");
 
-        waitTillNext(40);
+            waitTillNext(3);
 
-        // Would call tick() per 30s
-        Mockito.verify(mockCache1, Mockito.times(1)).tick();
-        Mockito.verify(mockCache2, Mockito.times(1)).tick();
+            Mockito.verify(mockCache1, Mockito.atLeastOnce()).tick();
+            Mockito.verify(mockCache2, Mockito.atLeastOnce()).tick();
 
-        Assert.assertEquals(0, cache1.size());
-        Assert.assertEquals(1, cache2.size());
+            Assert.assertEquals(0, cache1.size());
+            Assert.assertEquals(1, cache2.size());
+        } finally {
+            task.cancel();
+        }
     }
 
     @SuppressWarnings({"unused", "unchecked"})
@@ -308,4 +317,3 @@ public class CacheManagerTest extends BaseUnitTest {
         return (Cache<Id, Object>) p;
     }
 }
-
diff --git 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
index 4a1c18dbe..7bcc1a7fe 100644
--- 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
+++ 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java
@@ -17,12 +17,21 @@
 
 package org.apache.hugegraph.unit.cache;
 
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.hugegraph.HugeFactory;
 import org.apache.hugegraph.HugeGraph;
 import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.cache.Cache;
 import org.apache.hugegraph.backend.cache.CachedGraphTransaction;
 import org.apache.hugegraph.backend.id.Id;
 import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.event.EventListener;
 import org.apache.hugegraph.schema.VertexLabel;
 import org.apache.hugegraph.structure.HugeEdge;
 import org.apache.hugegraph.structure.HugeVertex;
@@ -42,19 +51,30 @@ public class CachedGraphTransactionTest extends 
BaseUnitTest {
 
     private CachedGraphTransaction cache;
     private HugeGraphParams params;
+    private HugeGraph graph;
 
     @Before
     public void setup() {
-        HugeGraph graph = HugeFactory.open(FakeObjects.newConfig());
-        this.params = Whitebox.getInternalState(graph, "params");
+        this.graph = HugeFactory.open(FakeObjects.newConfig());
+        this.params = Whitebox.getInternalState(this.graph, "params");
         this.cache = new CachedGraphTransaction(this.params,
                                                 this.params.loadGraphStore());
     }
 
     @After
     public void teardown() throws Exception {
-        this.cache().graph().clearBackend();
-        this.cache().graph().close();
+        try {
+            if (this.cache != null) {
+                this.cache.close();
+            }
+        } finally {
+            this.cache = null;
+            if (this.graph != null) {
+                this.graph.clearBackend();
+                this.graph.close();
+                this.graph = null;
+            }
+        }
     }
 
     private CachedGraphTransaction cache() {
@@ -62,6 +82,42 @@ public class CachedGraphTransactionTest extends BaseUnitTest 
{
         return this.cache;
     }
 
+    @SuppressWarnings("unchecked")
+    private static ConcurrentMap<String, Object> graphCacheEventListeners()
+            throws Exception {
+        Field field = CachedGraphTransaction.class
+                                            .getDeclaredField(
+                                                    
"GRAPH_CACHE_EVENT_LISTENERS");
+        field.setAccessible(true);
+        return (ConcurrentMap<String, Object>) field.get(null);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static ConcurrentMap<String, Boolean> storeEventListenStatus()
+            throws Exception {
+        Field field = GraphTransaction.class
+                                      
.getDeclaredField("storeEventListenStatus");
+        field.setAccessible(true);
+        return (ConcurrentMap<String, Boolean>) field.get(null);
+    }
+
+    private static void restoreStoreListenerStatusForKnownTeardownBug(
+            ConcurrentMap<String, Boolean> storeListeners, String graphName) {
+        // Closing a secondary transaction can consume storeEventListenStatus 
due
+        // to the follow-up bug documented in 
CachedGraphTransaction.unlistenChanges().
+        // Restore it so teardown can still unregister the primary store 
listener.
+        storeListeners.putIfAbsent(graphName, true);
+    }
+
+    private static EventListener holderListener(Object holder) {
+        return Whitebox.getInternalState(holder, "listener");
+    }
+
+    private static int holderRefCount(Object holder) {
+        Integer refCount = Whitebox.getInternalState(holder, "refCount");
+        return refCount;
+    }
+
     private HugeVertex newVertex(Id id) {
         HugeGraph graph = this.cache().graph();
         graph.schema().propertyKey("name").asText()
@@ -138,6 +194,174 @@ public class CachedGraphTransactionTest extends 
BaseUnitTest {
                             Whitebox.invoke(cache, "verticesCache", "size"));
     }
 
+    @Test
+    public void testClearCacheEmitsActionClear() throws Exception {
+        // Producers must emit the present-tense ACTION_CLEAR / ACTION_INVALID,
+        // not the legacy past-tense variants - otherwise local listeners that
+        // match only the present-tense actions silently drop the event.
+        CachedGraphTransaction cache = this.cache();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<String> action = new AtomicReference<>();
+        EventListener listener = event -> {
+            Object[] args = event.args();
+            if (args.length > 0 && args[0] instanceof String) {
+                action.set((String) args[0]);
+                latch.countDown();
+            }
+            return true;
+        };
+        this.params.graphEventHub().listen(Events.CACHE, listener);
+        try {
+            cache.clearCache(HugeType.VERTEX, true);
+
+            Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+            Assert.assertEquals(Cache.ACTION_CLEAR, action.get());
+        } finally {
+            this.params.graphEventHub().unlisten(Events.CACHE, listener);
+        }
+    }
+
+    @Test
+    public void testVertexMutationEmitsActionInvalid() throws Exception {
+        CachedGraphTransaction cache = this.cache();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<String> action = new AtomicReference<>();
+        EventListener listener = event -> {
+            Object[] args = event.args();
+            if (args.length > 0 && Cache.ACTION_INVALID.equals(args[0])) {
+                action.set((String) args[0]);
+                latch.countDown();
+            }
+            return true;
+        };
+        this.params.graphEventHub().listen(Events.CACHE, listener);
+        try {
+            cache.addVertex(this.newVertex(IdGenerator.of(1)));
+            cache.commit();
+
+            Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+            Assert.assertEquals(Cache.ACTION_INVALID, action.get());
+        } finally {
+            this.params.graphEventHub().unlisten(Events.CACHE, listener);
+        }
+    }
+
+    @Test
+    public void testClosingNonOwnerKeepsGraphCacheListenerRegistered()
+            throws Exception {
+        ConcurrentMap<String, Object> cacheListeners =
+                graphCacheEventListeners();
+        ConcurrentMap<String, Boolean> storeListeners =
+                storeEventListenStatus();
+
+        String graphName = this.params.spaceGraphName();
+        Object holder = cacheListeners.get(graphName);
+        Assert.assertNotNull(holder);
+        EventListener registered = holderListener(holder);
+        int refCount = holderRefCount(holder);
+
+        CachedGraphTransaction second = new CachedGraphTransaction(
+                this.params, this.params.loadGraphStore());
+        Assert.assertSame(holder, cacheListeners.get(graphName));
+        Assert.assertEquals(refCount + 1, holderRefCount(holder));
+
+        try {
+            second.close();
+
+            Assert.assertSame(holder, cacheListeners.get(graphName));
+            Assert.assertEquals(refCount, holderRefCount(holder));
+            Assert.assertTrue(this.params.graphEventHub()
+                                         .listeners(Events.CACHE)
+                                         .contains(registered));
+        } finally {
+            restoreStoreListenerStatusForKnownTeardownBug(storeListeners,
+                                                           graphName);
+        }
+    }
+
+    @Test
+    public void testCacheListenerSurvivesOwnerClose() throws Exception {
+        ConcurrentMap<String, Object> cacheListeners =
+                graphCacheEventListeners();
+        String graphName = this.params.spaceGraphName();
+        CachedGraphTransaction owner = this.cache();
+        CachedGraphTransaction second = new CachedGraphTransaction(
+                this.params, this.params.loadGraphStore());
+
+        Object holder = cacheListeners.get(graphName);
+        Assert.assertNotNull(holder);
+        EventListener registered = holderListener(holder);
+        int refCount = holderRefCount(holder);
+        Assert.assertTrue(refCount >= 2);
+
+        owner.close();
+        this.cache = second;
+
+        Assert.assertSame(holder, cacheListeners.get(graphName));
+        Assert.assertEquals(refCount - 1, holderRefCount(holder));
+        Assert.assertTrue(this.params.graphEventHub()
+                                     .listeners(Events.CACHE)
+                                     .contains(registered));
+
+        second.addVertex(this.newVertex(IdGenerator.of(1)));
+        second.addVertex(this.newVertex(IdGenerator.of(2)));
+        second.commit();
+        Assert.assertTrue(second.queryVertices(IdGenerator.of(1)).hasNext());
+        Assert.assertTrue(second.queryVertices(IdGenerator.of(2)).hasNext());
+        Assert.assertEquals(2L,
+                            Whitebox.invoke(second, "verticesCache", "size"));
+
+        this.params.graphEventHub().notify(Events.CACHE, Cache.ACTION_INVALID,
+                                           HugeType.VERTEX, IdGenerator.of(1))
+                   .get();
+
+        Assert.assertEquals(1L,
+                            Whitebox.invoke(second, "verticesCache", "size"));
+    }
+
+    @Test
+    public void testLastCloseRemovesGraphCacheListener() throws Exception {
+        ConcurrentMap<String, Object> cacheListeners =
+                graphCacheEventListeners();
+        String graphName = this.params.spaceGraphName();
+        CachedGraphTransaction owner = this.cache();
+        CachedGraphTransaction second = new CachedGraphTransaction(
+                this.params, this.params.loadGraphStore());
+
+        Object holder = cacheListeners.get(graphName);
+        Assert.assertNotNull(holder);
+        EventListener registered = holderListener(holder);
+        Assert.assertTrue(holderRefCount(holder) >= 2);
+
+        owner.close();
+        second.close();
+        this.cache = null;
+        this.params.graphTransaction().close();
+
+        Assert.assertFalse(cacheListeners.containsKey(graphName));
+        Assert.assertFalse(this.params.graphEventHub()
+                                      .listeners(Events.CACHE)
+                                      .contains(registered));
+
+        this.graph.clearBackend();
+        this.graph.close();
+        this.graph = null;
+
+        HugeGraph reopened = HugeFactory.open(FakeObjects.newConfig());
+        this.graph = reopened;
+        this.params = Whitebox.getInternalState(reopened, "params");
+        Object reopenedHolder = cacheListeners.get(graphName);
+        Assert.assertNotNull(reopenedHolder);
+        Assert.assertNotSame(holder, reopenedHolder);
+        int reopenedRefCount = holderRefCount(reopenedHolder);
+        CachedGraphTransaction third = new CachedGraphTransaction(
+                this.params, this.params.loadGraphStore());
+        this.cache = third;
+        Object newHolder = cacheListeners.get(graphName);
+        Assert.assertSame(reopenedHolder, newHolder);
+        Assert.assertEquals(reopenedRefCount + 1, holderRefCount(newHolder));
+    }
+
     @Test
     public void testEdgeCacheClearWhenDeleteVertex() {
         CachedGraphTransaction cache = this.cache();
diff --git 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
index efb63fad2..9921670ee 100644
--- 
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
+++ 
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
@@ -23,21 +23,26 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import org.apache.hugegraph.HugeFactory;
 import org.apache.hugegraph.HugeGraph;
 import org.apache.hugegraph.HugeGraphParams;
 import org.apache.hugegraph.backend.cache.Cache;
+import org.apache.hugegraph.backend.cache.CacheNotifier;
 import org.apache.hugegraph.backend.cache.CacheManager;
 import org.apache.hugegraph.backend.cache.CachedSchemaTransaction;
 import org.apache.hugegraph.backend.cache.CachedSchemaTransactionV2;
 import org.apache.hugegraph.backend.id.Id;
 import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.event.EventHub;
+import org.apache.hugegraph.event.EventListener;
 import org.apache.hugegraph.meta.MetaDriver;
 import org.apache.hugegraph.meta.MetaManager;
 import org.apache.hugegraph.meta.managers.GraphMetaManager;
@@ -60,19 +65,30 @@ public class CachedSchemaTransactionTest extends 
BaseUnitTest {
 
     private CachedSchemaTransaction cache;
     private HugeGraphParams params;
+    private HugeGraph graph;
 
     @Before
     public void setup() {
-        HugeGraph graph = HugeFactory.open(FakeObjects.newConfig());
-        this.params = Whitebox.getInternalState(graph, "params");
+        this.graph = HugeFactory.open(FakeObjects.newConfig());
+        this.params = Whitebox.getInternalState(this.graph, "params");
         this.cache = new CachedSchemaTransaction(this.params,
                                                  
this.params.loadSchemaStore());
     }
 
     @After
     public void teardown() throws Exception {
-        this.cache().graph().clearBackend();
-        this.cache().graph().close();
+        try {
+            if (this.cache != null) {
+                this.cache.close();
+            }
+        } finally {
+            this.cache = null;
+            if (this.graph != null) {
+                this.graph.clearBackend();
+                this.graph.close();
+                this.graph = null;
+            }
+        }
     }
 
     private CachedSchemaTransaction cache() {
@@ -80,6 +96,25 @@ public class CachedSchemaTransactionTest extends 
BaseUnitTest {
         return this.cache;
     }
 
+    @SuppressWarnings("unchecked")
+    private static ConcurrentMap<String, Object> schemaCacheEventListeners()
+            throws Exception {
+        Field field = CachedSchemaTransaction.class
+                                             .getDeclaredField(
+                                                     
"SCHEMA_CACHE_EVENT_LISTENERS");
+        field.setAccessible(true);
+        return (ConcurrentMap<String, Object>) field.get(null);
+    }
+
+    private static EventListener holderListener(Object holder) {
+        return Whitebox.getInternalState(holder, "listener");
+    }
+
+    private static int holderRefCount(Object holder) {
+        Integer refCount = Whitebox.getInternalState(holder, "refCount");
+        return refCount;
+    }
+
     @Test
     public void testEventClear() throws Exception {
         CachedSchemaTransaction cache = this.cache();
@@ -187,6 +222,176 @@ public class CachedSchemaTransactionTest extends 
BaseUnitTest {
                             cache.getPropertyKey(IdGenerator.of(1)).name());
     }
 
+    @Test
+    public void testLegacySchemaChangeEmitsActionInvalid()
+            throws Exception {
+        CachedSchemaTransaction cache = this.cache();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<String> action = new AtomicReference<>();
+        EventListener listener = event -> {
+            Object[] args = event.args();
+            if (args.length > 0) {
+                action.set((String) args[0]);
+                latch.countDown();
+            }
+            return true;
+        };
+        this.params.schemaEventHub().listen(Events.CACHE, listener);
+
+        try {
+            FakeObjects objects = new FakeObjects("unit-test");
+            cache.addPropertyKey(objects.newPropertyKey(IdGenerator.of(1),
+                                                        "fake-pk-1"));
+
+            Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+            Assert.assertEquals(Cache.ACTION_INVALID, action.get());
+            Assert.assertEquals(1L, Whitebox.invoke(cache, "idCache", "size"));
+            Assert.assertEquals(1L,
+                                Whitebox.invoke(cache, "nameCache", "size"));
+            Assert.assertEquals("fake-pk-1",
+                                cache.getPropertyKey(IdGenerator.of(1))
+                                     .name());
+            Assert.assertEquals(IdGenerator.of(1),
+                                cache.getPropertyKey("fake-pk-1").id());
+        } finally {
+            this.params.schemaEventHub().unlisten(Events.CACHE, listener);
+        }
+    }
+
+    @Test
+    public void testCacheListenerSurvivesOwnerClose() throws Exception {
+        ConcurrentMap<String, Object> registry = schemaCacheEventListeners();
+        String graphName = this.params.spaceGraphName();
+        CachedSchemaTransaction owner = this.cache();
+        CachedSchemaTransaction second = new CachedSchemaTransaction(
+                this.params, this.params.loadSchemaStore());
+
+        Object holder = registry.get(graphName);
+        Assert.assertNotNull(holder);
+        EventListener registered = holderListener(holder);
+        int refCount = holderRefCount(holder);
+        Assert.assertTrue(refCount >= 2);
+
+        owner.close();
+        this.cache = second;
+
+        Assert.assertSame(holder, registry.get(graphName));
+        Assert.assertEquals(refCount - 1, holderRefCount(holder));
+        Assert.assertTrue(this.params.schemaEventHub()
+                                     .listeners(Events.CACHE)
+                                     .contains(registered));
+
+        FakeObjects objects = new FakeObjects("unit-test");
+        second.addPropertyKey(objects.newPropertyKey(IdGenerator.of(1),
+                                                     "fake-pk-1"));
+        second.addPropertyKey(objects.newPropertyKey(IdGenerator.of(2),
+                                                     "fake-pk-2"));
+        Assert.assertEquals(2L, Whitebox.invoke(second, "idCache", "size"));
+        Assert.assertEquals(2L, Whitebox.invoke(second, "nameCache", "size"));
+
+        this.params.schemaEventHub().notify(Events.CACHE, Cache.ACTION_CLEAR,
+                                            null).get();
+
+        Assert.assertEquals(0L, Whitebox.invoke(second, "idCache", "size"));
+        Assert.assertEquals(0L, Whitebox.invoke(second, "nameCache", "size"));
+    }
+
+    @Test
+    public void testLastCloseRemovesSchemaCacheListener() throws Exception {
+        ConcurrentMap<String, Object> registry = schemaCacheEventListeners();
+        String graphName = this.params.spaceGraphName();
+        CachedSchemaTransaction owner = this.cache();
+        CachedSchemaTransaction second = new CachedSchemaTransaction(
+                this.params, this.params.loadSchemaStore());
+
+        Object holder = registry.get(graphName);
+        Assert.assertNotNull(holder);
+        EventListener registered = holderListener(holder);
+        Assert.assertTrue(holderRefCount(holder) >= 2);
+
+        owner.close();
+        second.close();
+        this.cache = null;
+        this.params.schemaTransaction().close();
+
+        Assert.assertFalse(registry.containsKey(graphName));
+        Assert.assertFalse(this.params.schemaEventHub()
+                                      .listeners(Events.CACHE)
+                                      .contains(registered));
+
+        this.graph.clearBackend();
+        this.graph.close();
+        this.graph = null;
+
+        HugeGraph reopened = HugeFactory.open(FakeObjects.newConfig());
+        this.graph = reopened;
+        this.params = Whitebox.getInternalState(reopened, "params");
+        Object reopenedHolder = registry.get(graphName);
+        Assert.assertNotNull(reopenedHolder);
+        Assert.assertNotSame(holder, reopenedHolder);
+        int reopenedRefCount = holderRefCount(reopenedHolder);
+        CachedSchemaTransaction third = new CachedSchemaTransaction(
+                this.params, this.params.loadSchemaStore());
+        this.cache = third;
+        Object newHolder = registry.get(graphName);
+        Assert.assertSame(reopenedHolder, newHolder);
+        Assert.assertEquals(reopenedRefCount + 1, holderRefCount(newHolder));
+    }
+
+    @Test
+    public void testCacheNotifierLocalEventCallsProxyOnce()
+            throws Exception {
+        EventHub hub = new EventHub("schema-cache-notifier-test");
+        AtomicInteger proxyInvalidCalls = new AtomicInteger();
+        CacheNotifier proxy = newCacheNotifier(proxyInvalidCalls,
+                                               new AtomicInteger(),
+                                               new AtomicInteger());
+        CacheNotifier notifier = newSchemaCacheNotifier(hub, proxy);
+
+        try {
+            Assert.assertEquals(1, (int) hub.notify(Events.CACHE,
+                                                    Cache.ACTION_INVALID,
+                                                    HugeType.PROPERTY_KEY,
+                                                    IdGenerator.of(1))
+                                           .get());
+            Assert.assertEquals(1, proxyInvalidCalls.get());
+        } finally {
+            notifier.close();
+        }
+    }
+
+    @Test
+    public void testCacheNotifierRpcInvalidDoesNotLoopToProxy()
+            throws Exception {
+        EventHub hub = new EventHub("schema-cache-notifier-test");
+        AtomicInteger proxyInvalidCalls = new AtomicInteger();
+        AtomicInteger localInvalidCalls = new AtomicInteger();
+        CountDownLatch latch = new CountDownLatch(1);
+        CacheNotifier proxy = newCacheNotifier(proxyInvalidCalls,
+                                               new AtomicInteger(),
+                                               new AtomicInteger());
+        CacheNotifier notifier = newSchemaCacheNotifier(hub, proxy);
+        EventListener localListener = event -> {
+            event.checkArgs(String.class, HugeType.class, Id.class);
+            if (Cache.ACTION_INVALID.equals(event.args()[0])) {
+                localInvalidCalls.incrementAndGet();
+                latch.countDown();
+            }
+            return true;
+        };
+        hub.listen(Events.CACHE, localListener);
+
+        try {
+            notifier.invalid(HugeType.PROPERTY_KEY, IdGenerator.of(1));
+
+            Assert.assertTrue(latch.await(1L, TimeUnit.SECONDS));
+            Assert.assertEquals(1, localInvalidCalls.get());
+            Assert.assertEquals(0, proxyInvalidCalls.get());
+        } finally {
+            notifier.close();
+        }
+    }
+
     @Test
     public void testClearV2SchemaCacheByGraphName() {
         String graphName = "DEFAULT-unit-test-v2";
@@ -684,6 +889,50 @@ public class CachedSchemaTransactionTest extends 
BaseUnitTest {
         return previous;
     }
 
+    private static CacheNotifier newSchemaCacheNotifier(EventHub hub,
+                                                        CacheNotifier proxy)
+                                                        throws Exception {
+        Class<?> clazz = Class.forName("org.apache.hugegraph." +
+                                       "StandardHugeGraph$" +
+                                       "HugeSchemaCacheNotifier");
+        Constructor<?> constructor = clazz.getDeclaredConstructor(
+                EventHub.class, CacheNotifier.class);
+        constructor.setAccessible(true);
+        return (CacheNotifier) constructor.newInstance(hub, proxy);
+    }
+
+    private static CacheNotifier newCacheNotifier(AtomicInteger invalidCalls,
+                                                  AtomicInteger invalid2Calls,
+                                                  AtomicInteger clearCalls) {
+        return new CacheNotifier() {
+
+            @Override
+            public void invalid(HugeType type, Id id) {
+                invalidCalls.incrementAndGet();
+            }
+
+            @Override
+            public void invalid2(HugeType type, Object[] ids) {
+                invalid2Calls.incrementAndGet();
+            }
+
+            @Override
+            public void clear(HugeType type) {
+                clearCalls.incrementAndGet();
+            }
+
+            @Override
+            public void reload() {
+                // pass
+            }
+
+            @Override
+            public void close() {
+                // pass
+            }
+        };
+    }
+
     @SuppressWarnings("unchecked")
     private static <T> T readField(Object target, String name)
             throws ReflectiveOperationException {

Reply via email to