This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a1fe2418260 Pipe: Avoided the OOM risks by replacing the progressive
cheating factor with policy change (#16398)
a1fe2418260 is described below
commit a1fe241826083c5b61e40722c89b80b0f58d901d
Author: Caideyipi <[email protected]>
AuthorDate: Thu Sep 11 17:22:46 2025 +0800
Pipe: Avoided the OOM risks by replacing the progressive cheating factor
with policy change (#16398)
---
.../downsampling/PartialPathLastObjectCache.java | 60 ++++++++---------
.../IoTDBDataNodeCacheLeaderClientManager.java | 78 ++++++++++------------
.../event/cache/SubscriptionPollResponseCache.java | 54 +++++++--------
3 files changed, 86 insertions(+), 106 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
index 3e423f5c437..e51aed46dc7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.utils.MemUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,35 +34,11 @@ public abstract class PartialPathLastObjectCache<T>
implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartialPathLastObjectCache.class);
private final PipeMemoryBlock allocatedMemoryBlock;
- // Used to adjust the memory usage of the cache
- private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
private final Cache<String, T> partialPath2ObjectCache;
protected PartialPathLastObjectCache(final long memoryLimitInBytes) {
- allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .tryAllocate(memoryLimitInBytes)
- .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
- .setShrinkCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor * ((double) oldMemory / newMemory));
- LOGGER.info(
- "PartialPathLastObjectCache.allocatedMemoryBlock has
shrunk from {} to {}.",
- oldMemory,
- newMemory);
- })
- .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
memoryLimitInBytes))
- .setExpandCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor / ((double) newMemory / oldMemory));
- LOGGER.info(
- "PartialPathLastObjectCache.allocatedMemoryBlock has
expanded from {} to {}.",
- oldMemory,
- newMemory);
- });
+ allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().tryAllocate(memoryLimitInBytes);
// Currently disable the metric here because it's not a constant cache and
the number may
// fluctuate. In the future all the "processorCache"s may be recorded in
single metric entry
@@ -75,16 +50,37 @@ public abstract class PartialPathLastObjectCache<T>
implements AutoCloseable {
(Weigher<String, T>)
(partialPath, object) -> {
final long weightInLong =
- (long)
- ((MemUtils.getStringMem(partialPath) +
calculateMemoryUsage(object))
- * memoryUsageCheatFactor.get());
- if (weightInLong <= 0) {
- return Integer.MAX_VALUE;
- }
+ MemUtils.getStringMem(partialPath) +
calculateMemoryUsage(object);
final int weightInInt = (int) weightInLong;
return weightInInt != weightInLong ? Integer.MAX_VALUE :
weightInInt;
})
.build();
+
+ allocatedMemoryBlock
+ .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+ .setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ partialPath2ObjectCache
+ .policy()
+ .eviction()
+ .ifPresent(eviction -> eviction.setMaximum(newMemory));
+ LOGGER.info(
+ "PartialPathLastObjectCache.allocatedMemoryBlock has shrunk
from {} to {}.",
+ oldMemory,
+ newMemory);
+ })
+ .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
memoryLimitInBytes))
+ .setExpandCallback(
+ (oldMemory, newMemory) -> {
+ partialPath2ObjectCache
+ .policy()
+ .eviction()
+ .ifPresent(eviction -> eviction.setMaximum(newMemory));
+ LOGGER.info(
+ "PartialPathLastObjectCache.allocatedMemoryBlock has
expanded from {} to {}.",
+ oldMemory,
+ newMemory);
+ });
}
protected abstract long calculateMemoryUsage(final T object);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
index dc24295f936..a9dbd43544e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -27,7 +27,6 @@ import
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +41,6 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(LeaderCacheManager.class);
private static final PipeConfig CONFIG = PipeConfig.getInstance();
- private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
-
// leader cache built by LRU
private final Cache<String, TEndPoint> device2endpoint;
// a hashmap to reuse the created endpoint
@@ -55,52 +52,47 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
// properties required by pipe memory control framework
final PipeMemoryBlock allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .tryAllocate(initMemorySizeInBytes)
- .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
- .setShrinkCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor * ((double) oldMemory / newMemory));
- LOGGER.info(
- "LeaderCacheManager.allocatedMemoryBlock has shrunk
from {} to {}.",
- oldMemory,
- newMemory);
- })
- .setExpandMethod(
- oldMemory ->
- Math.min(
- Math.max(oldMemory, 1) * 2,
- (long)
- (PipeDataNodeResourceManager.memory()
- .getTotalNonFloatingMemorySizeInBytes()
- *
CONFIG.getPipeLeaderCacheMemoryUsagePercentage())))
- .setExpandCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor / ((double) newMemory / oldMemory));
- LOGGER.info(
- "LeaderCacheManager.allocatedMemoryBlock has expanded
from {} to {}.",
- oldMemory,
- newMemory);
- });
+
PipeDataNodeResourceManager.memory().tryAllocate(initMemorySizeInBytes);
device2endpoint =
Caffeine.newBuilder()
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
- .weigher(
- (Weigher<String, TEndPoint>)
- (device, endPoint) -> {
- final long weightInLong =
- (long) (device.getBytes().length *
memoryUsageCheatFactor.get());
- if (weightInLong <= 0) {
- return Integer.MAX_VALUE;
- }
- final int weightInInt = (int) weightInLong;
- return weightInInt != weightInLong ? Integer.MAX_VALUE
: weightInInt;
- })
+ .weigher((Weigher<String, TEndPoint>) (device, endPoint) ->
device.getBytes().length)
.recordStats()
.build();
+
+ allocatedMemoryBlock
+ .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+ .setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ device2endpoint
+ .policy()
+ .eviction()
+ .ifPresent(eviction -> eviction.setMaximum(newMemory));
+ LOGGER.info(
+ "LeaderCacheManager.allocatedMemoryBlock has shrunk from
{} to {}.",
+ oldMemory,
+ newMemory);
+ })
+ .setExpandMethod(
+ oldMemory ->
+ Math.min(
+ Math.max(oldMemory, 1) * 2,
+ (long)
+ (PipeDataNodeResourceManager.memory()
+ .getTotalNonFloatingMemorySizeInBytes()
+ *
CONFIG.getPipeLeaderCacheMemoryUsagePercentage())))
+ .setExpandCallback(
+ (oldMemory, newMemory) -> {
+ device2endpoint
+ .policy()
+ .eviction()
+ .ifPresent(eviction -> eviction.setMaximum(newMemory));
+ LOGGER.info(
+ "LeaderCacheManager.allocatedMemoryBlock has expanded from
{} to {}.",
+ oldMemory,
+ newMemory);
+ });
}
public TEndPoint getLeaderEndPoint(final String deviceId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 55848c47c01..642442cc1a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -27,7 +27,6 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +43,6 @@ public class SubscriptionPollResponseCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPollResponseCache.class);
- private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
-
private final LoadingCache<CachedSubscriptionPollResponse, ByteBuffer> cache;
public ByteBuffer serialize(final CachedSubscriptionPollResponse response)
throws IOException {
@@ -113,41 +110,36 @@ public class SubscriptionPollResponseCache {
// properties required by pipe memory control framework
final PipeMemoryBlock allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .tryAllocate(initMemorySizeInBytes)
- .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
- .setShrinkCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor * ((double) oldMemory / newMemory));
- LOGGER.info(
- "SubscriptionEventBinaryCache.allocatedMemoryBlock has
shrunk from {} to {}.",
- oldMemory,
- newMemory);
- })
- .setExpandMethod(
- oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
maxMemorySizeInBytes))
- .setExpandCallback(
- (oldMemory, newMemory) -> {
- memoryUsageCheatFactor.updateAndGet(
- factor -> factor / ((double) newMemory / oldMemory));
- LOGGER.info(
- "SubscriptionEventBinaryCache.allocatedMemoryBlock has
expanded from {} to {}.",
- oldMemory,
- newMemory);
- });
+
PipeDataNodeResourceManager.memory().tryAllocate(initMemorySizeInBytes);
this.cache =
Caffeine.newBuilder()
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
.weigher(
(Weigher<CachedSubscriptionPollResponse, ByteBuffer>)
- (message, buffer) -> {
- // TODO: overflow
- return (int) (buffer.capacity() *
memoryUsageCheatFactor.get());
- })
+ (message, buffer) -> buffer.capacity())
.recordStats() // TODO: metrics
// NOTE: lambda CAN NOT be replaced with method reference
- .build(response ->
CachedSubscriptionPollResponse.serialize(response));
+ .build(CachedSubscriptionPollResponse::serialize);
+
+ allocatedMemoryBlock
+ .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+ .setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ cache.policy().eviction().ifPresent(eviction ->
eviction.setMaximum(newMemory));
+ LOGGER.info(
+ "SubscriptionEventBinaryCache.allocatedMemoryBlock has
shrunk from {} to {}.",
+ oldMemory,
+ newMemory);
+ })
+ .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
maxMemorySizeInBytes))
+ .setExpandCallback(
+ (oldMemory, newMemory) -> {
+ cache.policy().eviction().ifPresent(eviction ->
eviction.setMaximum(newMemory));
+ LOGGER.info(
+ "SubscriptionEventBinaryCache.allocatedMemoryBlock has
expanded from {} to {}.",
+ oldMemory,
+ newMemory);
+ });
}
}