This is an automated email from the ASF dual-hosted git repository.
imbajin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new ee177a0f7 fix(server): sync hstore schema cache clears (#3011)
ee177a0f7 is described below
commit ee177a0f707ef2e9d4dcc4caf5e90e7ec378ac24
Author: Davide Polato <[email protected]>
AuthorDate: Wed May 6 14:59:36 2026 +0200
fix(server): sync hstore schema cache clears (#3011)
Register a JVM-wide MetaManager listener on CachedSchemaTransactionV2 so
remote nodes clear their V2 schema-id/schema-name caches and the array
attachment on schema add/remove. Events carry a per-JVM source id to skip
self-echo;
legacy plain-string payloads still accepted for safe rolling
upgrades. Status transitions no longer broadcast to avoid notify storms
from background rebuild/remove jobs.
---
.../backend/cache/CachedSchemaTransactionV2.java | 169 ++++++-
.../org/apache/hugegraph/meta/MetaManager.java | 95 +++-
.../hugegraph/meta/managers/GraphMetaManager.java | 9 +-
.../meta/MetaManagerSchemaCacheClearEventTest.java | 74 +++
.../org/apache/hugegraph/unit/UnitTestSuite.java | 2 +
.../unit/cache/CachedSchemaTransactionTest.java | 544 +++++++++++++++++++++
6 files changed, 878 insertions(+), 15 deletions(-)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
index c335d50f0..d6fbe9796 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java
@@ -19,8 +19,11 @@ package org.apache.hugegraph.backend.cache;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeGraphParams;
@@ -33,6 +36,7 @@ import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.meta.MetaDriver;
import org.apache.hugegraph.meta.MetaManager;
+import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent;
import org.apache.hugegraph.perf.PerfUtil;
import org.apache.hugegraph.schema.SchemaElement;
import org.apache.hugegraph.type.HugeType;
@@ -43,6 +47,29 @@ import com.google.common.collect.ImmutableSet;
public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {
+ private static final String ID_CACHE_PREFIX = "schema-id";
+ private static final String NAME_CACHE_PREFIX = "schema-name";
+
+ // MetaDriver doesn't expose unlisten, register the meta listener once.
+ // Lifecycle: this JVM-global flag is intentionally never reset by
+ // unlistenChanges() (the underlying gRPC watch is process-wide). If that
+ // watch is silently dropped after a transport reconnect, recovery is not
+ // automatic; resetMetaListenerForReconnect() is only a manual hook to let
+ // the next schema operation install a fresh watch.
+ private static final AtomicBoolean metaEventListenerRegistered =
+ new AtomicBoolean(false);
+
+ private static final Object META_LISTENER_LOCK = new Object();
+
+ /**
+ * Per-JVM identifier emitted with every schema-cache-clear meta event so
+ * the listener can skip its own echo. Lifecycle: generated once per
+ * classloader at class init, never reused, regenerated on JVM restart.
+ * This is not a stable node identity, only a local self-echo filter.
+ */
+ private static final String SCHEMA_CACHE_CLEAR_SOURCE =
+ UUID.randomUUID().toString();
+
private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;
@@ -58,8 +85,8 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
final long capacity = graphParams.configuration()
.get(CoreOptions.SCHEMA_CACHE_CAPACITY);
- this.idCache = this.cache("schema-id", capacity);
- this.nameCache = this.cache("schema-name", capacity);
+ this.idCache = this.cache(ID_CACHE_PREFIX, capacity);
+ this.nameCache = this.cache(NAME_CACHE_PREFIX, capacity);
SchemaCaches<SchemaElement> attachment = this.idCache.attachment();
if (attachment == null) {
@@ -86,11 +113,38 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
}
private Cache<Id, Object> cache(String prefix, long capacity) {
- final String name = prefix + "-" + this.graph().spaceGraphName();
+ final String name = cacheName(prefix, this.graph().spaceGraphName());
// NOTE: must disable schema cache-expire due to getAllSchema()
return CacheManager.instance().cache(name, capacity);
}
+ private static String cacheName(String prefix, String spaceGraphName) {
+ return prefix + "-" + spaceGraphName;
+ }
+
+ private static void clearSchemaCache(String spaceGraphName) {
+ Map<String, Cache<Id, Object>> caches =
CacheManager.instance().caches();
+
+ // Clear name cache first so the (name -> id -> object) lookup path
+ // fails fast instead of returning a stale object backed by an
+ // already-empty id cache during the TOCTOU window.
+ Cache<Id, Object> nameCache = caches.get(cacheName(NAME_CACHE_PREFIX,
+ spaceGraphName));
+ if (nameCache != null) {
+ nameCache.clear();
+ }
+
+ Cache<Id, Object> idCache = caches.get(cacheName(ID_CACHE_PREFIX,
+ spaceGraphName));
+ if (idCache != null) {
+ SchemaCaches<?> arrayCaches = idCache.attachment();
+ if (arrayCaches != null) {
+ arrayCaches.clear();
+ }
+ idCache.clear();
+ }
+ }
+
private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
@@ -100,7 +154,8 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear schema cache on event '{}'",
this.graph(), event.name());
- this.clearCache(true);
+ boolean notify = !Events.STORE_INIT.equals(event.name());
+ this.clearCache(notify);
return true;
}
return false;
@@ -145,12 +200,88 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}
+
+ listenSchemaCacheClear();
+ }
+
+ private static void listenSchemaCacheClear() {
+ synchronized (META_LISTENER_LOCK) {
+ if (metaEventListenerRegistered.get()) {
+ return;
+ }
+ try {
+ MetaManager.instance().listenSchemaCacheClear(
+
CachedSchemaTransactionV2::handleSchemaCacheClearEvent);
+ // Set AFTER the underlying watch is live so a concurrent
+ // caller that observes the flag is guaranteed an active
+ // subscription, and a failure leaves the flag false so the
+ // next caller retries registration.
+ metaEventListenerRegistered.set(true);
+ } catch (Exception e) {
+ throw e instanceof RuntimeException
+ ? (RuntimeException) e
+ : new RuntimeException(
+ "Failed to register schema cache clear listener",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Consumer invoked by the MetaManager schema-cache-clear watch. Extracted
+ * as a package-private static method so end-to-end tests can drive the
+ * publish -> callback -> {@link #clearSchemaCache(String)} path without
+ * depending on a live etcd/PD watch.
+ */
+ static <T> void handleSchemaCacheClearEvent(T response) {
+ List<SchemaCacheClearEvent> events =
+ MetaManager.instance()
+ .extractSchemaCacheClearEventsFromResponse(
+ response);
+ if (events == null) {
+ return;
+ }
+ for (SchemaCacheClearEvent event : events) {
+ if (SCHEMA_CACHE_CLEAR_SOURCE.equals(event.source())) {
+ continue;
+ }
+ String graphName = event.graph();
+ LOG.debug("Graph {} clear schema cache on meta event", graphName);
+ clearSchemaCache(graphName);
+ }
+ }
+
+ /**
+ * Manually reset the JVM-global meta listener flag after detecting that
+ * the MetaManager transport reconnected and dropped the underlying gRPC
+ * watch. This method is not wired to a MetaManager/MetaDriver reconnect
+ * callback today; callers must invoke it explicitly after detecting that
+ * condition. Without such a manual reset {@link
#metaEventListenerRegistered}
+ * would stay {@code true} forever and this JVM would stop receiving
+ * cross-node schema cache clear events with no error or warning.
+ *
+ * <p>TODO: wire this into MetaManager once it exposes a transport
+ * reconnect callback (e.g. {@code listenReconnect} /
+ * {@code onTransportReconnect}). Until then it must be invoked
+ * explicitly by code that detects the reconnect.
+ */
+ public static void resetMetaListenerForReconnect() {
+ if (metaEventListenerRegistered.compareAndSet(true, false)) {
+ LOG.warn("Schema cache clear meta listener lost on reconnect - " +
+ "will re-register on next schema operation.");
+ }
}
public void clearCache(boolean notify) {
- this.idCache.clear();
+ // Same TOCTOU ordering as clearSchemaCache(String): clear nameCache
+ // first, then the array attachment, then idCache last.
this.nameCache.clear();
this.arrayCaches.clear();
+ this.idCache.clear();
+
+ if (notify) {
+ this.maybeNotifySchemaCacheClear();
+ }
}
private void resetCachedAllIfReachedCapacity() {
@@ -202,6 +333,8 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
super.updateSchema(schema, updateCallback);
this.updateCache(schema);
+ // Status transitions are internal bookkeeping; notifying here causes a
+ // broadcast storm for every updateSchemaStatus() call from background
jobs.
}
@Override
@@ -210,11 +343,9 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
this.updateCache(schema);
- if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
- MetaManager.instance()
- .notifySchemaCacheClear(this.graph().graphSpace(),
- this.graph().name());
- }
+ // Schema additions must always propagate to remote nodes regardless
+ // of TASK_SYNC_DELETION (which only gates removal flows).
+ this.notifySchemaCacheClear();
}
private void updateCache(SchemaElement schema) {
@@ -238,13 +369,25 @@ public class CachedSchemaTransactionV2 extends
SchemaTransactionV2 {
this.invalidateCache(schema.type(), schema.id());
+ this.maybeNotifySchemaCacheClear();
+ }
+
+ private void maybeNotifySchemaCacheClear() {
+ // Only suppress notifications for removal tasks when
+ // TASK_SYNC_DELETION=true: the caller propagates cache invalidation
+ // synchronously, so the meta-event broadcast would be redundant.
if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
- MetaManager.instance()
- .notifySchemaCacheClear(this.graph().graphSpace(),
- this.graph().name());
+ this.notifySchemaCacheClear();
}
}
+ private void notifySchemaCacheClear() {
+ MetaManager.instance()
+ .notifySchemaCacheClear(this.graph().graphSpace(),
+ this.graph().name(),
+ SCHEMA_CACHE_CLEAR_SOURCE);
+ }
+
@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java
index 551b21997..6637baf22 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java
@@ -57,11 +57,16 @@ import org.apache.hugegraph.space.GraphSpace;
import org.apache.hugegraph.space.SchemaTemplate;
import org.apache.hugegraph.space.Service;
import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.JsonUtil;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
public class MetaManager {
+ private static final Logger LOG = Log.logger(MetaManager.class);
+
public static final String META_PATH_DELIMITER = "/";
public static final String META_PATH_JOIN = "-";
@@ -119,6 +124,8 @@ public class MetaManager {
public static final long LOCK_DEFAULT_LEASE = 30L;
public static final long LOCK_DEFAULT_TIMEOUT = 10L;
public static final int RANDOM_USER_ID = 100;
+ private static final String SCHEMA_CACHE_CLEAR_GRAPH_KEY = "graph";
+ private static final String SCHEMA_CACHE_CLEAR_SOURCE_KEY = "source";
private static final String META_PATH_URLS = "URLS";
private static final String META_PATH_PD_PEERS = "HSTORE_PD_PEERS";
private static final MetaManager INSTANCE = new MetaManager();
@@ -380,6 +387,23 @@ public class MetaManager {
return this.metaDriver.extractValuesFromResponse(response);
}
+ public <T> List<SchemaCacheClearEvent>
extractSchemaCacheClearEventsFromResponse(
+ T response) {
+ List<String> values =
this.metaDriver.extractValuesFromResponse(response);
+ if (values == null) {
+ return null;
+ }
+
+ List<SchemaCacheClearEvent> events = new ArrayList<>(values.size());
+ for (String value : values) {
+ SchemaCacheClearEvent event =
SchemaCacheClearEvent.fromValue(value);
+ if (event != null) {
+ events.add(event);
+ }
+ }
+ return events;
+ }
+
public <T> Map<String, String> extractKVFromResponse(T response) {
return this.metaDriver.extractKVFromResponse(response);
}
@@ -499,7 +523,12 @@ public class MetaManager {
}
public void notifySchemaCacheClear(String graphSpace, String graph) {
- this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph);
+ this.notifySchemaCacheClear(graphSpace, graph, null);
+ }
+
+ public void notifySchemaCacheClear(String graphSpace, String graph,
+ String source) {
+ this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph,
source);
}
public void notifyGraphCacheClear(String graphSpace, String graph) {
@@ -1287,6 +1316,70 @@ public class MetaManager {
this.metaDriver.put(key, ((Boolean) status).toString());
}
+ public static String schemaCacheClearEventValue(String graph,
+ String source) {
+ if (StringUtils.isEmpty(source)) {
+ return graph;
+ }
+ return JsonUtil.toJson(ImmutableMap.of(SCHEMA_CACHE_CLEAR_GRAPH_KEY,
+ graph,
+ SCHEMA_CACHE_CLEAR_SOURCE_KEY,
+ source));
+ }
+
+ public static final class SchemaCacheClearEvent {
+
+ private final String graph;
+ private final String source;
+
+ private SchemaCacheClearEvent(String graph, String source) {
+ this.graph = graph;
+ this.source = source;
+ }
+
+ public String graph() {
+ return this.graph;
+ }
+
+ public String source() {
+ return this.source;
+ }
+
+ @SuppressWarnings("unchecked")
+ static SchemaCacheClearEvent fromValue(String value) {
+ if (StringUtils.isEmpty(value)) {
+ return null;
+ }
+ // Compatibility: events published before source-id support stored
+ // only the graph name as a plain string. Keep accepting that
format
+ // so mixed-version clusters can consume old/new schema-cache-clear
+ // events during rolling upgrades.
+ if (value.charAt(0) != '{') {
+ return new SchemaCacheClearEvent(value, null);
+ }
+
+ Map<String, Object> payload;
+ try {
+ payload = JsonUtil.fromJson(value, Map.class);
+ } catch (RuntimeException e) {
+ LOG.debug("Malformed schema-cache-clear payload, ignoring: {}",
+ value, e);
+ return null;
+ }
+
+ Object graph = payload.get(SCHEMA_CACHE_CLEAR_GRAPH_KEY);
+ if (graph == null) {
+ LOG.debug("Schema-cache-clear payload missing '{}' field: {}",
+ SCHEMA_CACHE_CLEAR_GRAPH_KEY, value);
+ return null;
+ }
+
+ Object source = payload.get(SCHEMA_CACHE_CLEAR_SOURCE_KEY);
+ String sourceValue = source == null ? null : source.toString();
+ return new SchemaCacheClearEvent(graph.toString(), sourceValue);
+ }
+ }
+
public enum MetaDriverType {
ETCD,
PD
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java
index 8d00bfabb..52b2c3946 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java
@@ -33,6 +33,7 @@ import static
org.apache.hugegraph.meta.MetaManager.META_PATH_SCHEMA;
import static org.apache.hugegraph.meta.MetaManager.META_PATH_SYS_GRAPH_CONF;
import static org.apache.hugegraph.meta.MetaManager.META_PATH_UPDATE;
import static org.apache.hugegraph.meta.MetaManager.META_PATH_VERTEX_LABEL;
+import static org.apache.hugegraph.meta.MetaManager.schemaCacheClearEventValue;
import java.util.Map;
import java.util.function.Consumer;
@@ -94,8 +95,14 @@ public class GraphMetaManager extends AbstractMetaManager {
}
public void notifySchemaCacheClear(String graphSpace, String graph) {
+ this.notifySchemaCacheClear(graphSpace, graph, null);
+ }
+
+ public void notifySchemaCacheClear(String graphSpace, String graph,
+ String source) {
this.metaDriver.put(this.schemaCacheClearKey(),
- graphName(graphSpace, graph));
+ schemaCacheClearEventValue(
+ graphName(graphSpace, graph), source));
}
public void notifyGraphCacheClear(String graphSpace, String graph) {
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/MetaManagerSchemaCacheClearEventTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/MetaManagerSchemaCacheClearEventTest.java
new file mode 100644
index 000000000..852d4eae0
--- /dev/null
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/MetaManagerSchemaCacheClearEventTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.meta;
+
+import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent;
+import org.apache.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class MetaManagerSchemaCacheClearEventTest {
+
+ @Test
+ public void testFromValueReturnsNullForEmptyPayload() {
+ Assert.assertNull(SchemaCacheClearEvent.fromValue(null));
+ Assert.assertNull(SchemaCacheClearEvent.fromValue(""));
+ }
+
+ @Test
+ public void testFromValueParsesLegacyPlainGraphName() {
+ SchemaCacheClearEvent event =
+ SchemaCacheClearEvent.fromValue("DEFAULT-graph1");
+
+ assertEvent(event, "DEFAULT-graph1", null);
+ }
+
+ @Test
+ public void testFromValueIgnoresMalformedJson() {
+ Assert.assertNull(SchemaCacheClearEvent.fromValue("{not-json"));
+ }
+
+ @Test
+ public void testFromValueParsesJsonWithSource() {
+ String value = MetaManager.schemaCacheClearEventValue("g", "u");
+ SchemaCacheClearEvent event = SchemaCacheClearEvent.fromValue(value);
+
+ assertEvent(event, "g", "u");
+ }
+
+ @Test
+ public void testFromValueParsesJsonWithoutSource() {
+ SchemaCacheClearEvent event =
+ SchemaCacheClearEvent.fromValue("{\"graph\":\"g\"}");
+
+ assertEvent(event, "g", null);
+ }
+
+ @Test
+ public void testFromValueIgnoresJsonWithoutGraph() {
+ Assert.assertNull(
+ SchemaCacheClearEvent.fromValue("{\"source\":\"u\"}"));
+ }
+
+ private static void assertEvent(SchemaCacheClearEvent event,
+ String graph,
+ String source) {
+ Assert.assertNotNull(event);
+ Assert.assertEquals(graph, event.graph());
+ Assert.assertEquals(source, event.source());
+ }
+}
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java
index 4a62e48bb..ce249a967 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.hugegraph.unit;
import org.apache.hugegraph.core.RoleElectionStateMachineTest;
+import org.apache.hugegraph.meta.MetaManagerSchemaCacheClearEventTest;
import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest;
import org.apache.hugegraph.unit.api.filter.PathFilterTest;
import org.apache.hugegraph.unit.api.gremlin.GremlinQueryAPITest;
@@ -92,6 +93,7 @@ import org.junit.runners.Suite;
CacheTest.OffheapCacheTest.class,
CacheTest.LevelCacheTest.class,
CachedSchemaTransactionTest.class,
+ MetaManagerSchemaCacheClearEventTest.class,
CachedGraphTransactionTest.class,
CacheManagerTest.class,
RamTableTest.class,
diff --git
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
index 83ba5097f..efb63fad2 100644
---
a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
+++
b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java
@@ -17,11 +17,31 @@
package org.apache.hugegraph.unit.cache;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.cache.Cache;
+import org.apache.hugegraph.backend.cache.CacheManager;
import org.apache.hugegraph.backend.cache.CachedSchemaTransaction;
+import org.apache.hugegraph.backend.cache.CachedSchemaTransactionV2;
+import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
+import org.apache.hugegraph.meta.MetaDriver;
+import org.apache.hugegraph.meta.MetaManager;
+import org.apache.hugegraph.meta.managers.GraphMetaManager;
+import org.apache.hugegraph.schema.SchemaElement;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.type.HugeType;
@@ -31,6 +51,8 @@ import org.apache.hugegraph.util.Events;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
@@ -165,6 +187,528 @@ public class CachedSchemaTransactionTest extends
BaseUnitTest {
cache.getPropertyKey(IdGenerator.of(1)).name());
}
+ @Test
+ public void testClearV2SchemaCacheByGraphName() {
+ String graphName = "DEFAULT-unit-test-v2";
+ String otherGraphName = "DEFAULT-other-v2";
+
+ Cache<Id, Object> idCache = CacheManager.instance()
+ .cache("schema-id-" +
+ graphName, 10L);
+ Cache<Id, Object> nameCache = CacheManager.instance()
+ .cache("schema-name-" +
+ graphName, 10L);
+ Cache<Id, Object> otherIdCache = CacheManager.instance()
+ .cache("schema-id-" +
+ otherGraphName,
+ 10L);
+ Object arrayCaches = idCache.attachment(newV2SchemaCaches(10));
+ Id arrayCacheId = IdGenerator.of(1);
+ SchemaElement arrayCacheSchema =
+ new FakeObjects("unit-test-v2")
+ .newPropertyKey(arrayCacheId, "fake-pk-array");
+
+ try {
+ clearV2SchemaCaches(arrayCaches);
+ setV2SchemaCache(arrayCaches, HugeType.PROPERTY_KEY, arrayCacheId,
+ arrayCacheSchema);
+ idCache.update(IdGenerator.of(1), "fake-pk-by-id");
+ nameCache.update(IdGenerator.of("fake-pk"), "fake-pk-by-name");
+ otherIdCache.update(IdGenerator.of(2), "other-pk-by-id");
+
+ Assert.assertEquals(1L, idCache.size());
+ Assert.assertEquals(1L, nameCache.size());
+ Assert.assertEquals(1L, otherIdCache.size());
+ Assert.assertSame(arrayCacheSchema,
+ getV2SchemaCache(arrayCaches,
+ HugeType.PROPERTY_KEY,
+ arrayCacheId));
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{String.class},
+ "clearSchemaCache", graphName);
+
+ Assert.assertEquals(0L, idCache.size());
+ Assert.assertEquals(0L, nameCache.size());
+ Assert.assertEquals(1L, otherIdCache.size());
+ Assert.assertNull(getV2SchemaCache(arrayCaches,
+ HugeType.PROPERTY_KEY,
+ arrayCacheId));
+ } finally {
+ clearV2SchemaCaches(arrayCaches);
+ idCache.clear();
+ nameCache.clear();
+ otherIdCache.clear();
+ }
+ }
+
+ private static Object newV2SchemaCaches(int size) {
+ for (Class<?> clazz :
+ CachedSchemaTransactionV2.class.getDeclaredClasses()) {
+ if (!"SchemaCaches".equals(clazz.getSimpleName())) {
+ continue;
+ }
+ try {
+ Constructor<?> constructor =
+ clazz.getDeclaredConstructor(int.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(size);
+ } catch (ReflectiveOperationException e) {
+ throw new AssertionError("Failed to create SchemaCaches", e);
+ }
+ }
+ throw new AssertionError("SchemaCaches class not found");
+ }
+
+ private static void clearV2SchemaCaches(Object arrayCaches) {
+ Whitebox.invoke(arrayCaches.getClass(), "clear", arrayCaches);
+ }
+
+ private static void setV2SchemaCache(Object arrayCaches, HugeType type,
+ Id id, SchemaElement schema) {
+ Whitebox.invoke(arrayCaches.getClass(),
+ new Class<?>[]{HugeType.class, Id.class,
+ SchemaElement.class},
+ "set", arrayCaches, type, id, schema);
+ }
+
+ private static SchemaElement getV2SchemaCache(Object arrayCaches,
+ HugeType type, Id id) {
+ return Whitebox.invoke(arrayCaches.getClass(),
+ new Class<?>[]{HugeType.class, Id.class},
+ "get", arrayCaches, type, id);
+ }
+
+ @Test
+ public void testListenSchemaCacheClearIsIdempotent() throws Exception {
+ // Once the JVM-global registration flag is set, every subsequent
+ // call to listenSchemaCacheClear() must short-circuit before
+ // touching MetaManager — even under concurrent invocation. Pre-set
+ // the flag, race N threads, and verify none of them propagated an
+ // exception (which would happen if MetaManager.instance()
+ // .listenSchemaCacheClear were invoked without an initialised
+ // driver).
+ Field flagField = CachedSchemaTransactionV2.class
+ .getDeclaredField("metaEventListenerRegistered");
+ flagField.setAccessible(true);
+ AtomicBoolean flag = (AtomicBoolean) flagField.get(null);
+ boolean previous = flag.getAndSet(true);
+ try {
+ int threads = 8;
+ CountDownLatch start = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(threads);
+ AtomicInteger failures = new AtomicInteger();
+ for (int i = 0; i < threads; i++) {
+ new Thread(() -> {
+ try {
+ start.await();
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ "listenSchemaCacheClear");
+ } catch (Throwable t) {
+ failures.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+ start.countDown();
+ Assert.assertTrue("listenSchemaCacheClear race timed out",
+ done.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals("listenSchemaCacheClear must short-circuit " +
+ "when already registered", 0, failures.get());
+ Assert.assertTrue("registration flag must remain set", flag.get());
+ } finally {
+ flag.set(previous);
+ }
+ }
+
+ @Test
+ public void testClearSchemaCacheClearsArrayAttachmentMaps()
+ throws Exception {
+ // clearSchemaCache() must wipe idCache, nameCache and every internal
+ // IntObjectMap (pks/vls/els/ils) inside the array attachment so
+ // stale entries are not served after a meta event.
+ String graphName = "DEFAULT-unit-test-v2-array";
+ Cache<Id, Object> idCache =
+ CacheManager.instance().cache("schema-id-" + graphName, 10L);
+ Cache<Id, Object> nameCache =
+ CacheManager.instance().cache("schema-name-" + graphName, 10L);
+ // Size must comfortably exceed the largest id below: IntObjectMap
+ // grows by doubling and refuses to write past currentSize even after
+ // a single expansion, so a small capacity rejects mid-range keys.
+ Object arrayCaches = idCache.attachment(newV2SchemaCaches(64));
+ Id pkId = IdGenerator.of(1);
+ Id vlId = IdGenerator.of(2);
+ Id elId = IdGenerator.of(3);
+ Id ilId = IdGenerator.of(4);
+ FakeObjects fakeObjects = new FakeObjects("unit-test-v2-array");
+ SchemaElement pk = fakeObjects.newPropertyKey(pkId, "fake-pk");
+
+ try {
+ clearV2SchemaCaches(arrayCaches);
+ setV2SchemaCache(arrayCaches, HugeType.PROPERTY_KEY, pkId, pk);
+ setV2SchemaCache(arrayCaches, HugeType.VERTEX_LABEL, vlId, pk);
+ setV2SchemaCache(arrayCaches, HugeType.EDGE_LABEL, elId, pk);
+ setV2SchemaCache(arrayCaches, HugeType.INDEX_LABEL, ilId, pk);
+ idCache.update(pkId, "fake-pk-by-id");
+ nameCache.update(IdGenerator.of("fake-pk"), "fake-pk-by-name");
+
+ Assert.assertEquals(1L, idCache.size());
+ Assert.assertEquals(1L, nameCache.size());
+ Assert.assertNotNull(getV2SchemaCache(arrayCaches,
+ HugeType.PROPERTY_KEY,
pkId));
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{String.class},
+ "clearSchemaCache", graphName);
+
+ Assert.assertEquals(0L, idCache.size());
+ Assert.assertEquals(0L, nameCache.size());
+ for (String mapName : new String[]{"pks", "vls", "els", "ils"}) {
+ Object intMap = readField(arrayCaches, mapName);
+ assertIntObjectMapEmpty(intMap, mapName);
+ }
+ Map<HugeType, Boolean> cachedTypes = readField(arrayCaches,
+ "cachedTypes");
+ Assert.assertTrue("cachedTypes must be empty after clear",
+ cachedTypes.isEmpty());
+ } finally {
+ clearV2SchemaCaches(arrayCaches);
+ idCache.clear();
+ nameCache.clear();
+ }
+ }
+
+ // TASK_SYNC_DELETION gating of removeSchema notifications and the
+ // unconditional addSchema notification require an initialised
+ // CachedSchemaTransactionV2 instance, which in turn needs an hstore
+ // backend and a connected MetaManager. Both prerequisites are out of
+ // scope for this unit test class. They are exercised end-to-end by the
+ // hstore integration tests in CoreTestSuite. TODO(#2617): port these
+ // assertions into a dedicated CachedSchemaTransactionV2IT once
+ // mockito-inline becomes available so MetaManager.instance() can be
+ // stubbed without an hstore cluster.
+
+ @Test
+ public void testHandleSchemaCacheClearEventSkipsLocalSource()
+ throws Exception {
+ String graphName = "DEFAULT-meta-local-source-v2";
+ Cache<Id, Object> idCache =
+ CacheManager.instance().cache("schema-id-" + graphName, 10L);
+ Cache<Id, Object> nameCache =
+ CacheManager.instance()
+ .cache("schema-name-" + graphName, 10L);
+
+ MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+ Object localResponse = new Object();
+ Object remoteResponse = new Object();
+ String localSource = schemaCacheClearSource();
+ Mockito.when(mockDriver.extractValuesFromResponse(localResponse))
+ .thenReturn(Collections.singletonList(
+ MetaManager.schemaCacheClearEventValue(graphName,
+ localSource)));
+ Mockito.when(mockDriver.extractValuesFromResponse(remoteResponse))
+ .thenReturn(Collections.singletonList(
+ MetaManager.schemaCacheClearEventValue(graphName,
+ "remote")));
+
+ MetaDriver originalDriver = swapMetaDriver(mockDriver);
+ try {
+ idCache.update(IdGenerator.of(1), "v");
+ nameCache.update(IdGenerator.of("n"), "v");
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{Object.class},
+ "handleSchemaCacheClearEvent",
+ localResponse);
+
+ Assert.assertEquals("local echo must not clear id cache",
+ 1L, idCache.size());
+ Assert.assertEquals("local echo must not clear name cache",
+ 1L, nameCache.size());
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{Object.class},
+ "handleSchemaCacheClearEvent",
+ remoteResponse);
+
+ Assert.assertEquals(0L, idCache.size());
+ Assert.assertEquals(0L, nameCache.size());
+ } finally {
+ swapMetaDriver(originalDriver);
+ idCache.clear();
+ nameCache.clear();
+ }
+ }
+
+ @Test
+ public void testHandleSchemaCacheClearEventClearsTargetGraphOnly()
+ throws Exception {
+ // End-to-end coverage of the meta-event consumer:
+ // publish (response) -> MetaManager extract -> clearSchemaCache
+ // We bypass the live etcd/PD watch by stubbing MetaDriver on the
+ // MetaManager singleton and invoking the package-private consumer
+ // directly. This validates that only the targeted graph's caches are
+ // cleared and that other graphs in the same JVM are left untouched.
+ String targetGraph = "DEFAULT-meta-target-v2";
+ String otherGraph = "DEFAULT-meta-other-v2";
+
+ Cache<Id, Object> targetIdCache =
+ CacheManager.instance().cache("schema-id-" + targetGraph, 10L);
+ Cache<Id, Object> targetNameCache =
+ CacheManager.instance()
+ .cache("schema-name-" + targetGraph, 10L);
+ Cache<Id, Object> otherIdCache =
+ CacheManager.instance().cache("schema-id-" + otherGraph, 10L);
+
+ MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+ Object response = new Object();
+ Mockito.when(mockDriver.extractValuesFromResponse(response))
+ .thenReturn(Arrays.asList(targetGraph));
+
+ MetaDriver originalDriver = swapMetaDriver(mockDriver);
+ try {
+ targetIdCache.update(IdGenerator.of(1), "v");
+ targetNameCache.update(IdGenerator.of("n"), "v");
+ otherIdCache.update(IdGenerator.of(2), "v");
+
+ Assert.assertEquals(1L, targetIdCache.size());
+ Assert.assertEquals(1L, targetNameCache.size());
+ Assert.assertEquals(1L, otherIdCache.size());
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{Object.class},
+ "handleSchemaCacheClearEvent", response);
+
+ Assert.assertEquals(0L, targetIdCache.size());
+ Assert.assertEquals(0L, targetNameCache.size());
+ Assert.assertEquals("Other graph caches must remain untouched",
+ 1L, otherIdCache.size());
+ } finally {
+ swapMetaDriver(originalDriver);
+ targetIdCache.clear();
+ targetNameCache.clear();
+ otherIdCache.clear();
+ }
+ }
+
+ @Test
+ public void testHandleSchemaCacheClearEventNullGraphsIsNoop()
+ throws Exception {
+ // A response that yields no graph names (extractor returns null) must
+ // be a strict noop: caches stay populated.
+ String graphName = "DEFAULT-meta-noop-v2";
+ Cache<Id, Object> idCache =
+ CacheManager.instance().cache("schema-id-" + graphName, 10L);
+
+ MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+ Object response = new Object();
+ Mockito.when(mockDriver.extractValuesFromResponse(response))
+ .thenReturn(null);
+
+ MetaDriver originalDriver = swapMetaDriver(mockDriver);
+ try {
+ idCache.update(IdGenerator.of(1), "v");
+ Assert.assertEquals(1L, idCache.size());
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{Object.class},
+ "handleSchemaCacheClearEvent", response);
+
+ Assert.assertEquals("noop response must not clear any cache",
+ 1L, idCache.size());
+ } finally {
+ swapMetaDriver(originalDriver);
+ idCache.clear();
+ }
+ }
+
+ @Test
+ public void testHandleSchemaCacheClearEventClearsMultipleGraphs()
+ throws Exception {
+ // A single meta event may carry multiple graph names; every one of
+ // them must have its V2 caches cleared.
+ String graphA = "DEFAULT-meta-multi-a";
+ String graphB = "DEFAULT-meta-multi-b";
+ Cache<Id, Object> idA =
+ CacheManager.instance().cache("schema-id-" + graphA, 10L);
+ Cache<Id, Object> idB =
+ CacheManager.instance().cache("schema-id-" + graphB, 10L);
+
+ MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+ Object response = new Object();
+ Mockito.when(mockDriver.extractValuesFromResponse(response))
+ .thenReturn(Arrays.asList(graphA, graphB));
+
+ MetaDriver originalDriver = swapMetaDriver(mockDriver);
+ try {
+ idA.update(IdGenerator.of(1), "v");
+ idB.update(IdGenerator.of(2), "v");
+ Assert.assertEquals(1L, idA.size());
+ Assert.assertEquals(1L, idB.size());
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ new Class<?>[]{Object.class},
+ "handleSchemaCacheClearEvent", response);
+
+ Assert.assertEquals(0L, idA.size());
+ Assert.assertEquals(0L, idB.size());
+ } finally {
+ swapMetaDriver(originalDriver);
+ idA.clear();
+ idB.clear();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testListenSchemaCacheClearRegistersOnlyOnce() throws Exception
{
+ // Two CachedSchemaTransactionV2 instances in the same JVM must share
+ // the JVM-global meta listener: only ONE underlying watch should be
+ // installed even if listenSchemaCacheClear() is invoked multiple
+ // times. We assert this directly against the MetaDriver mock.
+ MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+ GraphMetaManager mockGraphMgr =
+ new GraphMetaManager(mockDriver, "test-cluster");
+
+ AtomicBoolean flag = metaListenerFlag();
+ boolean previousFlag = flag.getAndSet(false);
+ MetaDriver originalDriver = swapMetaDriver(mockDriver);
+ Object originalGraphMgr = swapGraphMetaManager(mockGraphMgr);
+ try {
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ "listenSchemaCacheClear");
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ "listenSchemaCacheClear");
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ "listenSchemaCacheClear");
+
+ ArgumentCaptor<Consumer> captor =
+ ArgumentCaptor.forClass(Consumer.class);
+ Mockito.verify(mockDriver, Mockito.times(1))
+ .listen(Mockito.anyString(), captor.capture());
+ Assert.assertNotNull("registered consumer must not be null",
+ captor.getValue());
+ Assert.assertTrue("flag must be set after successful registration",
+ flag.get());
+ } finally {
+ flag.set(previousFlag);
+ swapMetaDriver(originalDriver);
+ swapGraphMetaManager(originalGraphMgr);
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testListenSchemaCacheClearEndToEnd() throws Exception {
+ // Full publish -> callback -> clear path: register the listener via
+ // the production code, capture the consumer that was wired into the
+ // MetaDriver, then invoke it as the watch would and assert the V2
+ // caches for the named graph are cleared.
+ String graphName = "DEFAULT-end-to-end-v2";
+ Cache<Id, Object> idCache =
+ CacheManager.instance().cache("schema-id-" + graphName, 10L);
+ Cache<Id, Object> nameCache =
+ CacheManager.instance()
+ .cache("schema-name-" + graphName, 10L);
+
+ MetaDriver mockDriver = Mockito.mock(MetaDriver.class);
+ Object response = new Object();
+ Mockito.when(mockDriver.extractValuesFromResponse(response))
+ .thenReturn(Collections.singletonList(graphName));
+ GraphMetaManager mockGraphMgr =
+ new GraphMetaManager(mockDriver, "test-cluster");
+
+ AtomicBoolean flag = metaListenerFlag();
+ boolean previousFlag = flag.getAndSet(false);
+ MetaDriver originalDriver = swapMetaDriver(mockDriver);
+ Object originalGraphMgr = swapGraphMetaManager(mockGraphMgr);
+ try {
+ idCache.update(IdGenerator.of(1), "v");
+ nameCache.update(IdGenerator.of("n"), "v");
+ Assert.assertEquals(1L, idCache.size());
+ Assert.assertEquals(1L, nameCache.size());
+
+ Whitebox.invokeStatic(CachedSchemaTransactionV2.class,
+ "listenSchemaCacheClear");
+
+ ArgumentCaptor<Consumer> captor =
+ ArgumentCaptor.forClass(Consumer.class);
+ Mockito.verify(mockDriver)
+ .listen(Mockito.anyString(), captor.capture());
+
+ // Simulate the meta server publishing a schema-cache-clear event:
+ // invoke the consumer captured above with a synthetic response.
+ captor.getValue().accept(response);
+
+ Assert.assertEquals(0L, idCache.size());
+ Assert.assertEquals(0L, nameCache.size());
+ } finally {
+ flag.set(previousFlag);
+ swapMetaDriver(originalDriver);
+ swapGraphMetaManager(originalGraphMgr);
+ idCache.clear();
+ nameCache.clear();
+ }
+ }
+
+ private static AtomicBoolean metaListenerFlag() throws Exception {
+ Field f = CachedSchemaTransactionV2.class
+ .getDeclaredField("metaEventListenerRegistered");
+ f.setAccessible(true);
+ return (AtomicBoolean) f.get(null);
+ }
+
+ private static String schemaCacheClearSource() throws Exception {
+ Field f = CachedSchemaTransactionV2.class
+ .getDeclaredField("SCHEMA_CACHE_CLEAR_SOURCE");
+ f.setAccessible(true);
+ return (String) f.get(null);
+ }
+
+ private static MetaDriver swapMetaDriver(MetaDriver replacement)
+ throws Exception {
+ Field f = MetaManager.class.getDeclaredField("metaDriver");
+ f.setAccessible(true);
+ MetaDriver previous = (MetaDriver) f.get(MetaManager.instance());
+ f.set(MetaManager.instance(), replacement);
+ return previous;
+ }
+
+ private static Object swapGraphMetaManager(Object replacement)
+ throws Exception {
+ Field f = MetaManager.class.getDeclaredField("graphMetaManager");
+ f.setAccessible(true);
+ Object previous = f.get(MetaManager.instance());
+ f.set(MetaManager.instance(), replacement);
+ return previous;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T readField(Object target, String name)
+ throws ReflectiveOperationException {
+ Field field = target.getClass().getDeclaredField(name);
+ field.setAccessible(true);
+ return (T) field.get(target);
+ }
+
+ private static void assertIntObjectMapEmpty(Object intMap, String label)
+ throws ReflectiveOperationException {
+ Object array = readField(intMap, "array");
+ if (array instanceof Object[]) {
+ for (Object slot : (Object[]) array) {
+ Assert.assertNull(label + " slot must be null after clear",
+ slot);
+ }
+ return;
+ }
+ // Older IntObjectMap implementations expose a size accessor instead
+ // of a raw array; fall back to that if reflection finds no array.
+ Object size = Whitebox.invoke(intMap.getClass(), "size", intMap);
+ Assert.assertEquals(label + " must report size 0 after clear",
+ 0, ((Number) size).intValue());
+ }
+
@Test
public void testResetCachedAllIfReachedCapacity() throws Exception {
CachedSchemaTransaction cache = this.cache();