This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 761f4e73f54 Pipe: Fixed the problem of not being able to write
normally due to insufficient memory (#15701)
761f4e73f54 is described below
commit 761f4e73f5450681de534d59d1344393976fb184
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jun 11 18:55:45 2025 +0800
Pipe: Fixed the problem of not being able to write normally due to
insufficient memory (#15701)
* Pipe: Fixed the problem of not being able to write normally due to
insufficient memory
* fix
* fix
* fix
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 --
.../overview/PipeWALInsertNodeCacheMetrics.java | 92 +++-------------------
.../wal/checkpoint/CheckpointManager.java | 4 +-
.../dataregion/wal/utils/WALEntryPosition.java | 2 +-
.../dataregion/wal/utils/WALInsertNodeCache.java | 43 ++++------
.../dataregion/wal/node/WALEntryHandlerTest.java | 2 +-
.../wal/node/WalDeleteOutdatedNewTest.java | 2 +-
.../wal/utils/WALInsertNodeCacheTest.java | 2 +-
9 files changed, 30 insertions(+), 133 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index db77ef7ce2d..8137959a391 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -151,8 +151,6 @@ public class IoTDBConfig {
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 /
10;
- private long allocateMemoryPerWalCache = 512 * 1024;
-
/** Flush proportion for system */
private double flushProportion = 0.4;
@@ -2004,14 +2002,6 @@ public class IoTDBConfig {
this.writeMemoryVariationReportProportion =
writeMemoryVariationReportProportion;
}
- public long getAllocateMemoryPerWalCache() {
- return allocateMemoryPerWalCache;
- }
-
- public void setAllocateMemoryPerWalCache(final long
allocateMemoryForWalCache) {
- this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
- }
-
public boolean isEnablePartialInsert() {
return enablePartialInsert;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 155aab7c5b5..dfd854f724c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2458,12 +2458,6 @@ public class IoTDBDescriptor {
conf.setIotConsensusV2DeletionFileDir(
properties.getProperty(
"iot_consensus_v2_deletion_file_dir",
conf.getIotConsensusV2DeletionFileDir()));
-
- conf.setAllocateMemoryPerWalCache(
- Long.parseLong(
- properties.getProperty(
- "allocate_memory_per_wal_cache",
- Long.toString(conf.getAllocateMemoryPerWalCache()))));
}
private void loadCQProps(TrimProperties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
index deabdcc2f10..2c4ee76153f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java
@@ -27,117 +27,43 @@ import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
-import com.google.common.collect.ImmutableSet;
-import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
public class PipeWALInsertNodeCacheMetrics implements IMetricSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeWALInsertNodeCacheMetrics.class);
- @SuppressWarnings("java:S3077")
- private volatile AbstractMetricService metricService;
-
- private final Map<Integer, WALInsertNodeCache> cacheMap = new
ConcurrentHashMap<>();
-
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
public void bindTo(AbstractMetricService metricService) {
- this.metricService = metricService;
- ImmutableSet<Integer> dataRegionIds =
ImmutableSet.copyOf(cacheMap.keySet());
- for (Integer dataRegionId : dataRegionIds) {
- createMetrics(dataRegionId);
- }
- }
-
- private void createMetrics(Integer dataRegionId) {
- createAutoGauge(dataRegionId);
- }
-
- private void createAutoGauge(Integer dataRegionId) {
metricService.createAutoGauge(
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
MetricLevel.IMPORTANT,
- cacheMap.get(dataRegionId),
- WALInsertNodeCache::getCacheHitRate,
- Tag.REGION.toString(),
- String.valueOf(dataRegionId));
+ WALInsertNodeCache.getInstance(),
+ WALInsertNodeCache::getCacheHitRate);
metricService.createAutoGauge(
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
MetricLevel.IMPORTANT,
- cacheMap.get(dataRegionId),
- WALInsertNodeCache::getCacheHitCount,
- Tag.REGION.toString(),
- String.valueOf(dataRegionId));
+ WALInsertNodeCache.getInstance(),
+ WALInsertNodeCache::getCacheHitCount);
metricService.createAutoGauge(
Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
MetricLevel.IMPORTANT,
- cacheMap.get(dataRegionId),
+ WALInsertNodeCache.getInstance(),
WALInsertNodeCache::getCacheRequestCount,
- Tag.REGION.toString(),
- String.valueOf(dataRegionId));
+ Tag.REGION.toString());
}
@Override
public void unbindFrom(AbstractMetricService metricService) {
- ImmutableSet<Integer> dataRegionIds =
ImmutableSet.copyOf(cacheMap.keySet());
- for (Integer dataRegionId : dataRegionIds) {
- deregister(dataRegionId);
- }
- if (!cacheMap.isEmpty()) {
- LOGGER.warn("Failed to unbind from wal insert node cache metrics, cache
map not empty");
- }
- }
-
- private void removeMetrics(Integer dataRegionId) {
- removeAutoGauge(dataRegionId);
- }
-
- private void removeAutoGauge(Integer dataRegionId) {
metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
- Tag.REGION.toString(),
- String.valueOf(dataRegionId));
+ MetricType.AUTO_GAUGE,
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString());
metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
- Tag.REGION.toString(),
- String.valueOf(dataRegionId));
+ MetricType.AUTO_GAUGE,
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString());
metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
- Tag.REGION.toString(),
- String.valueOf(dataRegionId));
- }
-
- //////////////////////////// register & deregister (pipe integration)
////////////////////////////
-
- public void register(@NonNull WALInsertNodeCache walInsertNodeCache, Integer
dataRegionId) {
- cacheMap.putIfAbsent(dataRegionId, walInsertNodeCache);
- if (Objects.nonNull(metricService)) {
- createMetrics(dataRegionId);
- }
- }
-
- public void deregister(Integer dataRegionId) {
- // TODO: waiting called by WALInsertNodeCache
- if (!cacheMap.containsKey(dataRegionId)) {
- LOGGER.warn(
- "Failed to deregister wal insert node cache metrics,
WALInsertNodeCache({}) does not exist",
- dataRegionId);
- return;
- }
- if (Objects.nonNull(metricService)) {
- removeMetrics(dataRegionId);
- }
- cacheMap.remove(dataRegionId);
+ MetricType.AUTO_GAUGE,
Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString());
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
index 1c859f1eae4..afa6651dfa4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
@@ -279,7 +279,7 @@ public class CheckpointManager implements AutoCloseable {
}
MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
if (!memTableInfo.isPinned()) {
-
WALInsertNodeCache.getInstance(memTableInfo.getDataRegionId()).addMemTable(memTableId);
+ WALInsertNodeCache.getInstance().addMemTable(memTableId);
}
memTableInfo.pin();
} finally {
@@ -309,7 +309,7 @@ public class CheckpointManager implements AutoCloseable {
MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
memTableInfo.unpin();
if (!memTableInfo.isPinned()) {
-
WALInsertNodeCache.getInstance(memTableInfo.getDataRegionId()).removeMemTable(memTableId);
+ WALInsertNodeCache.getInstance().removeMemTable(memTableId);
if (memTableInfo.isFlushed()) {
memTableId2Info.remove(memTableId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index a5622d29c49..06ced2c32fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -190,7 +190,7 @@ public class WALEntryPosition {
public void setWalNode(WALNode walNode, long memTableId) {
this.walNode = walNode;
identifier = walNode.getIdentifier();
- cache = WALInsertNodeCache.getInstance(walNode.getRegionId(memTableId));
+ cache = WALInsertNodeCache.getInstance();
}
public String getIdentifier() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 46c3d962754..431e8d0ba51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
@@ -39,7 +38,6 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
import org.apache.tsfile.utils.Pair;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,10 +63,6 @@ public class WALInsertNodeCache {
private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
- private final PipeModelFixedMemoryBlock memoryBlock;
-
- // Used to adjust the memory usage of the cache
- private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>>
lruCache;
@@ -77,16 +71,15 @@ public class WALInsertNodeCache {
private volatile boolean hasPipeRunning = false;
- private WALInsertNodeCache(final Integer dataRegionId) {
+ private WALInsertNodeCache() {
if (walModelFixedMemory == null) {
init();
}
- final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
-
- memoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(requestedAllocateSize,
PipeMemoryBlockType.WAL);
+ final long requestedAllocateSize =
+ (long)
+
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
+ * PIPE_CONFIG.getPipeDataStructureWalMemoryProportion());
lruCache =
Caffeine.newBuilder()
@@ -96,12 +89,9 @@ public class WALInsertNodeCache {
(position, pair) -> {
long weightInLong = 0L;
if (pair.right != null) {
- weightInLong =
- (long)
- (InsertNodeMemoryEstimator.sizeOf(pair.right)
- * memoryUsageCheatFactor.get());
+ weightInLong =
InsertNodeMemoryEstimator.sizeOf(pair.right);
} else {
- weightInLong = (long) (position.getSize() *
memoryUsageCheatFactor.get());
+ weightInLong = position.getSize();
}
if (weightInLong <= 0) {
return Integer.MAX_VALUE;
@@ -111,8 +101,6 @@ public class WALInsertNodeCache {
})
.recordStats()
.build(new WALInsertNodeCacheLoader());
-
- PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
// please call this method at PipeLauncher
@@ -124,7 +112,11 @@ public class WALInsertNodeCache {
// Allocate memory for the fixed memory block of WAL
walModelFixedMemory =
PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(0L,
PipeMemoryBlockType.WAL);
+ .forceAllocateForModelFixedMemoryBlock(
+ (long)
+
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
+ *
PIPE_CONFIG.getPipeDataStructureWalMemoryProportion()),
+ PipeMemoryBlockType.WAL);
} catch (Exception e) {
LOGGER.error("Failed to initialize WAL model fixed memory block", e);
walModelFixedMemory =
@@ -318,17 +310,13 @@ public class WALInsertNodeCache {
/////////////////////////// Singleton ///////////////////////////
- public static WALInsertNodeCache getInstance(final Integer regionId) {
- return InstanceHolder.getOrCreateInstance(regionId);
+ public static WALInsertNodeCache getInstance() {
+ return InstanceHolder.INSTANCE;
}
private static class InstanceHolder {
- private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new
ConcurrentHashMap<>();
-
- public static WALInsertNodeCache getOrCreateInstance(final Integer key) {
- return INSTANCE_MAP.computeIfAbsent(key, k -> new
WALInsertNodeCache(key));
- }
+ public static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
private InstanceHolder() {
// forbidding instantiation
@@ -345,7 +333,6 @@ public class WALInsertNodeCache {
@TestOnly
public void clear() {
lruCache.invalidateAll();
- memoryBlock.close();
memTablesNeedSearch.clear();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 0496d58d7dc..d5913e54355 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -94,7 +94,7 @@ public class WALEntryHandlerTest {
config.setWalMode(prevMode);
EnvironmentUtils.cleanDir(logDirectory1);
EnvironmentUtils.cleanDir(logDirectory2);
- WALInsertNodeCache.getInstance(1).clear();
+ WALInsertNodeCache.getInstance().clear();
}
@Test(expected = MemTablePinException.class)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
index 7109a14b030..593d25b6932 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
@@ -88,7 +88,7 @@ public class WalDeleteOutdatedNewTest {
config.setDataRegionConsensusProtocolClass(prevConsensus);
EnvironmentUtils.cleanDir(logDirectory1);
StorageEngine.getInstance().reset();
- WALInsertNodeCache.getInstance(1).clear();
+ WALInsertNodeCache.getInstance().clear();
}
/**
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index d1e20b4b2ef..552c8334f95 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -54,7 +54,7 @@ public class WALInsertNodeCacheTest {
private static final String databasePath = "root.test_sg";
private static final String devicePath = databasePath + ".test_d";
private static final String dataRegionId = "1";
- private static final WALInsertNodeCache cache =
WALInsertNodeCache.getInstance(1);
+ private static final WALInsertNodeCache cache =
WALInsertNodeCache.getInstance();
private WALMode prevMode;
private WALNode walNode;