This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6a2178f87d2 [To dev/1.3] Fixed the schema cache calculation 2 & The
potential NPE caused by concurrent invalidate and update (#16834) & The schema
cache is not cleared for "clear schema cache" (#16833)
6a2178f87d2 is described below
commit 6a2178f87d257834af467cac648161d5e7e2018d
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 2 12:20:19 2025 +0800
[To dev/1.3] Fixed the schema cache calculation 2 & The potential NPE
caused by concurrent invalidate and update (#16834) & The schema cache is not
cleared for "clear schema cache" (#16833)
* fix
* fix
* ffff
* 13-fix
* fix
* fix
* refactor
---
.../impl/DataNodeInternalRPCServiceImpl.java | 7 ++-
.../analyze/cache/schema/DeviceCacheEntry.java | 6 ++-
.../plan/analyze/cache/schema/DeviceLastCache.java | 46 ++++++++++------
.../analyze/cache/schema/DeviceNormalSchema.java | 4 +-
.../analyze/cache/schema/DeviceSchemaCache.java | 4 +-
.../dualkeycache/impl/CacheEntryGroupImpl.java | 6 +++
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 61 +++++++++-------------
.../schema/dualkeycache/impl/ICacheEntryGroup.java | 3 ++
.../db/queryengine/plan/parser/ASTVisitor.java | 4 +-
.../db/metadata/cache/DataNodeSchemaCacheTest.java | 13 +++++
.../commons/schema/cache/CacheClearOptions.java | 2 +-
11 files changed, 89 insertions(+), 67 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9d837b6f8fb..7d25b5c0d35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1974,16 +1974,15 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
|| options.contains(CacheClearOptions.QUERY)) {
storageEngine.clearCache();
}
- if (options.contains(CacheClearOptions.QUERY)
- && options.contains(CacheClearOptions.TREE_SCHEMA)) {
+ if (options.contains(CacheClearOptions.QUERY) &&
options.contains(CacheClearOptions.SCHEMA)) {
schemaCache.getDeviceSchemaCache().invalidateAll();
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
if (options.contains(CacheClearOptions.QUERY)) {
schemaCache.getDeviceSchemaCache().invalidateLastCache();
}
- if (options.contains(CacheClearOptions.TREE_SCHEMA)) {
- schemaCache.getDeviceSchemaCache().invalidateTreeSchema();
+ if (options.contains(CacheClearOptions.SCHEMA)) {
+ schemaCache.getDeviceSchemaCache().invalidateSchema();
}
} catch (final Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
index a12b9c7661b..3337e2f1420 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java
@@ -97,8 +97,10 @@ public class DeviceCacheEntry {
final AtomicInteger size = new AtomicInteger(0);
deviceSchema.updateAndGet(
schema -> {
- size.set(schema.estimateSize());
- return schema;
+ if (Objects.nonNull(schema)) {
+ size.set(schema.estimateSize());
+ }
+ return null;
});
return size.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
index 36fc39c3286..9ebc1fc413c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java
@@ -99,12 +99,12 @@ public class DeviceLastCache {
(measurementKey, tvPair) -> {
if (Objects.isNull(newPair)) {
diff.addAndGet(
- -((int) RamUsageEstimator.sizeOf(measurement) +
getTVPairEntrySize(tvPair)));
+ -((int) RamUsageEstimator.sizeOf(measurement) +
getTvPairEntrySize(tvPair)));
return null;
}
if (Objects.isNull(tvPair)) {
diff.addAndGet(
- (int) RamUsageEstimator.sizeOf(measurement) +
getTVPairEntrySize(newPair));
+ (int) RamUsageEstimator.sizeOf(measurement) +
getTvPairEntrySize(newPair));
return newPair;
}
return tvPair;
@@ -128,7 +128,9 @@ public class DeviceLastCache {
for (int i = 0; i < measurements.length; ++i) {
if (Objects.isNull(timeValuePairs[i])) {
if (invalidateNull) {
- measurement2CachedLastMap.remove(measurements[i]);
+ diff.addAndGet(
+ -((int) RamUsageEstimator.sizeOf(measurements[i])
+ +
getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
}
continue;
}
@@ -164,7 +166,7 @@ public class DeviceLastCache {
measurement2CachedLastMap.computeIfPresent(
measurement,
(s, timeValuePair) -> {
- diff.set((int) RamUsageEstimator.sizeOf(s) +
getTVPairEntrySize(timeValuePair));
+ diff.set((int) RamUsageEstimator.sizeOf(s) +
getTvPairEntrySize(timeValuePair));
time.set(timeValuePair.getTimestamp());
return null;
});
@@ -175,7 +177,7 @@ public class DeviceLastCache {
"",
(s, timeValuePair) -> {
if (timeValuePair.getTimestamp() <= time.get()) {
- diff.addAndGet(getTVPairEntrySize(timeValuePair));
+ diff.addAndGet((int) RamUsageEstimator.sizeOf(s) +
getTvPairEntrySize(timeValuePair));
return null;
}
return timeValuePair;
@@ -184,13 +186,18 @@ public class DeviceLastCache {
return diff.get();
}
- private int getTVPairEntrySize(final TimeValuePair tvPair) {
- return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
- + ((Objects.isNull(tvPair)
- || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
- || tvPair == EMPTY_TIME_VALUE_PAIR)
- ? 0
- : tvPair.getSize());
+ private static int getTvPairEntrySize(final TimeValuePair tvPair) {
+ return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY +
getTvPairSize(tvPair);
+ }
+
+ private static int getTvPairSize(final TimeValuePair tvPair) {
+ return isEmptyTvPair(tvPair) ? 0 : tvPair.getSize();
+ }
+
+ private static boolean isEmptyTvPair(final TimeValuePair tvPair) {
+ return Objects.isNull(tvPair)
+ || tvPair == PLACEHOLDER_TIME_VALUE_PAIR
+ || tvPair == EMPTY_TIME_VALUE_PAIR;
}
@Nullable
@@ -202,16 +209,21 @@ public class DeviceLastCache {
int estimateSize() {
return INSTANCE_SIZE
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY *
measurement2CachedLastMap.size()
- + measurement2CachedLastMap.values().stream()
- .mapToInt(TimeValuePair::getSize)
+ + measurement2CachedLastMap.entrySet().stream()
+ .mapToInt(
+ entry ->
+ (int) RamUsageEstimator.sizeOf(entry.getKey())
+ + DeviceLastCache.getTvPairSize(entry.getValue()))
.reduce(0, Integer::sum);
}
private static int getDiffSize(
final TimeValuePair oldTimeValuePair, final TimeValuePair
newTimeValuePair) {
- if (oldTimeValuePair == EMPTY_TIME_VALUE_PAIR
- || oldTimeValuePair == PLACEHOLDER_TIME_VALUE_PAIR) {
- return newTimeValuePair.getSize();
+ if (isEmptyTvPair(oldTimeValuePair)) {
+ return getTvPairSize(newTimeValuePair);
+ }
+ if (isEmptyTvPair(newTimeValuePair)) {
+ return -getTvPairSize(oldTimeValuePair);
}
final TsPrimitiveType oldValue = oldTimeValuePair.getValue();
final TsPrimitiveType newValue = newTimeValuePair.getValue();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
index 5ce2ca06fcd..6d50905bbca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java
@@ -99,13 +99,13 @@ public class DeviceNormalSchema implements IDeviceSchema {
public int estimateSize() {
// Do not need to calculate database because it is interned
return INSTANCE_SIZE
+ + measurementMap.size() * (int)
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+ measurementMap.entrySet().stream()
.mapToInt(
entry ->
Math.toIntExact(
RamUsageEstimator.sizeOf(entry.getKey())
- + SchemaCacheEntry.estimateSize(entry.getValue())
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
+ + SchemaCacheEntry.estimateSize(entry.getValue())))
.reduce(0, Integer::sum);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
index 8d66cb3d619..f13e2030cf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java
@@ -229,7 +229,7 @@ public class DeviceSchemaCache {
return dualKeyCache.stats().requestCount();
}
- long getMemoryUsage() {
+ public long getMemoryUsage() {
return dualKeyCache.stats().memoryUsage();
}
@@ -284,7 +284,7 @@ public class DeviceSchemaCache {
}
}
- public void invalidateTreeSchema() {
+ public void invalidateSchema() {
lock.lock();
try {
dualKeyCache.update(segment -> true, device -> true, entry ->
-entry.invalidateSchema());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
index 54c53a15d50..ea7f502a90b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java
@@ -72,6 +72,12 @@ public class CacheEntryGroupImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
return cacheEntryMap.compute(secondKey, computation.apply(memory));
}
+ @Override
+ public T computeCacheEntryIfPresent(
+ final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>>
computation) {
+ return cacheEntryMap.computeIfPresent(secondKey,
computation.apply(memory));
+ }
+
@Override
public long removeCacheEntry(final SK secondKey) {
final T result = cacheEntryMap.remove(secondKey);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index 86d10422249..1778920a23a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -144,7 +144,28 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
@Override
public void update(
final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
- final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
+ mayEvict();
+ }
+
+ @Override
+ public void update(
+ final Predicate<FK> firstKeyChecker,
+ final Predicate<SK> secondKeyChecker,
+ final ToIntFunction<V> updater) {
+ for (final FK firstKey : firstKeyMap.getAllKeys()) {
+ if (!firstKeyChecker.test(firstKey)) {
+ continue;
+ }
+ clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
+ }
+ mayEvict();
+ }
+
+ public void clearSecondEntry(
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup,
+ final Predicate<SK> secondKeyChecker,
+ final ToIntFunction<V> updater) {
if (Objects.nonNull(entryGroup)) {
entryGroup
.getAllCacheEntries()
@@ -153,49 +174,15 @@ class DualKeyCacheImpl<FK, SK, V, T extends
ICacheEntry<SK, V>>
if (!secondKeyChecker.test(entry.getKey())) {
return;
}
- entryGroup.computeCacheEntry(
+ entryGroup.computeCacheEntryIfPresent(
entry.getKey(),
memory ->
(secondKey, cacheEntry) -> {
- if (Objects.nonNull(cacheEntry)) {
-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
- }
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
return cacheEntry;
});
});
}
- mayEvict();
- }
-
- @Override
- public void update(
- final Predicate<FK> firstKeyChecker,
- final Predicate<SK> secondKeyChecker,
- final ToIntFunction<V> updater) {
- for (final FK firstKey : firstKeyMap.getAllKeys()) {
- if (!firstKeyChecker.test(firstKey)) {
- continue;
- }
- final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
- if (Objects.nonNull(entryGroup)) {
- entryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- entryGroup.computeCacheEntry(
- entry.getKey(),
- memory ->
- (secondKey, cacheEntry) -> {
-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
- return cacheEntry;
- });
- });
- }
- mayEvict();
- }
}
private void mayEvict() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
index d1f8ab9923e..0791fadc325 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java
@@ -45,6 +45,9 @@ interface ICacheEntryGroup<FK, SK, V, T extends
ICacheEntry<SK, V>> {
T computeCacheEntry(
final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>>
computation);
+ T computeCacheEntryIfPresent(
+ final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>>
computation);
+
long removeCacheEntry(final SK secondKey);
boolean isEmpty();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 62e7e803b5e..0f60d2f9ba6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3356,12 +3356,12 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
clearCacheStatement.setOnCluster(ctx.LOCAL() == null);
if (ctx.SCHEMA() != null) {
-
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.TREE_SCHEMA));
+
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.SCHEMA));
} else if (ctx.QUERY() != null) {
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.QUERY));
} else if (ctx.ALL() != null) {
clearCacheStatement.setOptions(
- new HashSet<>(Arrays.asList(CacheClearOptions.TREE_SCHEMA,
CacheClearOptions.QUERY)));
+ new HashSet<>(Arrays.asList(CacheClearOptions.SCHEMA,
CacheClearOptions.QUERY)));
} else {
clearCacheStatement.setOptions(Collections.singleton(CacheClearOptions.DEFAULT));
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
index d2f11d65d1f..144f2396ef7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCacheTest.java
@@ -260,6 +260,11 @@ public class DataNodeSchemaCacheTest {
Assert.assertEquals(
new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
dataNodeSchemaCache.getLastCache(new MeasurementPath("root.db.d.s3")));
+
+
Assert.assertTrue(dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage() >
0);
+
+ dataNodeSchemaCache.cleanUp();
+ Assert.assertEquals(0,
dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage());
}
@Test
@@ -313,5 +318,13 @@ public class DataNodeSchemaCacheTest {
Assert.assertEquals(1, measurementPaths.size());
Assert.assertEquals(TSDataType.FLOAT,
measurementPaths.get(0).getMeasurementSchema().getType());
Assert.assertEquals("root.sg1.d3.s1",
measurementPaths.get(0).getFullPath());
+
+ dataNodeSchemaCache.getDeviceSchemaCache().invalidateSchema();
+ dataNodeSchemaCache.getDeviceSchemaCache().invalidateSchema();
+ dataNodeSchemaCache.getDeviceSchemaCache().invalidateSchema();
+
Assert.assertTrue(dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage() >
0);
+
+ dataNodeSchemaCache.getDeviceSchemaCache().invalidateAll();
+ Assert.assertEquals(0,
dataNodeSchemaCache.getDeviceSchemaCache().getMemoryUsage());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
index 3ee5ba02f77..076f859eae7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/cache/CacheClearOptions.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.commons.schema.cache;
public enum CacheClearOptions {
- TREE_SCHEMA,
+ SCHEMA,
QUERY,
DEFAULT,
}