This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a1fe2418260 Pipe: Avoided the OOM risks by replacing the progressive 
cheating factor with policy change (#16398)
a1fe2418260 is described below

commit a1fe241826083c5b61e40722c89b80b0f58d901d
Author: Caideyipi <[email protected]>
AuthorDate: Thu Sep 11 17:22:46 2025 +0800

    Pipe: Avoided the OOM risks by replacing the progressive cheating factor 
with policy change (#16398)
---
 .../downsampling/PartialPathLastObjectCache.java   | 60 ++++++++---------
 .../IoTDBDataNodeCacheLeaderClientManager.java     | 78 ++++++++++------------
 .../event/cache/SubscriptionPollResponseCache.java | 54 +++++++--------
 3 files changed, 86 insertions(+), 106 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
index 3e423f5c437..e51aed46dc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.utils.MemUtils;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,35 +34,11 @@ public abstract class PartialPathLastObjectCache<T> 
implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialPathLastObjectCache.class);
 
   private final PipeMemoryBlock allocatedMemoryBlock;
-  // Used to adjust the memory usage of the cache
-  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
 
   private final Cache<String, T> partialPath2ObjectCache;
 
   protected PartialPathLastObjectCache(final long memoryLimitInBytes) {
-    allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(memoryLimitInBytes)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
-            .setShrinkCallback(
-                (oldMemory, newMemory) -> {
-                  memoryUsageCheatFactor.updateAndGet(
-                      factor -> factor * ((double) oldMemory / newMemory));
-                  LOGGER.info(
-                      "PartialPathLastObjectCache.allocatedMemoryBlock has 
shrunk from {} to {}.",
-                      oldMemory,
-                      newMemory);
-                })
-            .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
memoryLimitInBytes))
-            .setExpandCallback(
-                (oldMemory, newMemory) -> {
-                  memoryUsageCheatFactor.updateAndGet(
-                      factor -> factor / ((double) newMemory / oldMemory));
-                  LOGGER.info(
-                      "PartialPathLastObjectCache.allocatedMemoryBlock has 
expanded from {} to {}.",
-                      oldMemory,
-                      newMemory);
-                });
+    allocatedMemoryBlock = 
PipeDataNodeResourceManager.memory().tryAllocate(memoryLimitInBytes);
 
     // Currently disable the metric here because it's not a constant cache and 
the number may
     // fluctuate. In the future all the "processorCache"s may be recorded in 
single metric entry
@@ -75,16 +50,37 @@ public abstract class PartialPathLastObjectCache<T> 
implements AutoCloseable {
                 (Weigher<String, T>)
                     (partialPath, object) -> {
                       final long weightInLong =
-                          (long)
-                              ((MemUtils.getStringMem(partialPath) + 
calculateMemoryUsage(object))
-                                  * memoryUsageCheatFactor.get());
-                      if (weightInLong <= 0) {
-                        return Integer.MAX_VALUE;
-                      }
+                          MemUtils.getStringMem(partialPath) + 
calculateMemoryUsage(object);
                       final int weightInInt = (int) weightInLong;
                       return weightInInt != weightInLong ? Integer.MAX_VALUE : 
weightInInt;
                     })
             .build();
+
+    allocatedMemoryBlock
+        .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+        .setShrinkCallback(
+            (oldMemory, newMemory) -> {
+              partialPath2ObjectCache
+                  .policy()
+                  .eviction()
+                  .ifPresent(eviction -> eviction.setMaximum(newMemory));
+              LOGGER.info(
+                  "PartialPathLastObjectCache.allocatedMemoryBlock has shrunk 
from {} to {}.",
+                  oldMemory,
+                  newMemory);
+            })
+        .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
memoryLimitInBytes))
+        .setExpandCallback(
+            (oldMemory, newMemory) -> {
+              partialPath2ObjectCache
+                  .policy()
+                  .eviction()
+                  .ifPresent(eviction -> eviction.setMaximum(newMemory));
+              LOGGER.info(
+                  "PartialPathLastObjectCache.allocatedMemoryBlock has 
expanded from {} to {}.",
+                  oldMemory,
+                  newMemory);
+            });
   }
 
   protected abstract long calculateMemoryUsage(final T object);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
index dc24295f936..a9dbd43544e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,8 +41,6 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LeaderCacheManager.class);
     private static final PipeConfig CONFIG = PipeConfig.getInstance();
 
-    private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
-
     // leader cache built by LRU
     private final Cache<String, TEndPoint> device2endpoint;
     // a hashmap to reuse the created endpoint
@@ -55,52 +52,47 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
 
       // properties required by pipe memory control framework
       final PipeMemoryBlock allocatedMemoryBlock =
-          PipeDataNodeResourceManager.memory()
-              .tryAllocate(initMemorySizeInBytes)
-              .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
-              .setShrinkCallback(
-                  (oldMemory, newMemory) -> {
-                    memoryUsageCheatFactor.updateAndGet(
-                        factor -> factor * ((double) oldMemory / newMemory));
-                    LOGGER.info(
-                        "LeaderCacheManager.allocatedMemoryBlock has shrunk 
from {} to {}.",
-                        oldMemory,
-                        newMemory);
-                  })
-              .setExpandMethod(
-                  oldMemory ->
-                      Math.min(
-                          Math.max(oldMemory, 1) * 2,
-                          (long)
-                              (PipeDataNodeResourceManager.memory()
-                                      .getTotalNonFloatingMemorySizeInBytes()
-                                  * 
CONFIG.getPipeLeaderCacheMemoryUsagePercentage())))
-              .setExpandCallback(
-                  (oldMemory, newMemory) -> {
-                    memoryUsageCheatFactor.updateAndGet(
-                        factor -> factor / ((double) newMemory / oldMemory));
-                    LOGGER.info(
-                        "LeaderCacheManager.allocatedMemoryBlock has expanded 
from {} to {}.",
-                        oldMemory,
-                        newMemory);
-                  });
+          
PipeDataNodeResourceManager.memory().tryAllocate(initMemorySizeInBytes);
 
       device2endpoint =
           Caffeine.newBuilder()
               .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
-              .weigher(
-                  (Weigher<String, TEndPoint>)
-                      (device, endPoint) -> {
-                        final long weightInLong =
-                            (long) (device.getBytes().length * 
memoryUsageCheatFactor.get());
-                        if (weightInLong <= 0) {
-                          return Integer.MAX_VALUE;
-                        }
-                        final int weightInInt = (int) weightInLong;
-                        return weightInInt != weightInLong ? Integer.MAX_VALUE 
: weightInInt;
-                      })
+              .weigher((Weigher<String, TEndPoint>) (device, endPoint) -> 
device.getBytes().length)
               .recordStats()
               .build();
+
+      allocatedMemoryBlock
+          .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+          .setShrinkCallback(
+              (oldMemory, newMemory) -> {
+                device2endpoint
+                    .policy()
+                    .eviction()
+                    .ifPresent(eviction -> eviction.setMaximum(newMemory));
+                LOGGER.info(
+                    "LeaderCacheManager.allocatedMemoryBlock has shrunk from 
{} to {}.",
+                    oldMemory,
+                    newMemory);
+              })
+          .setExpandMethod(
+              oldMemory ->
+                  Math.min(
+                      Math.max(oldMemory, 1) * 2,
+                      (long)
+                          (PipeDataNodeResourceManager.memory()
+                                  .getTotalNonFloatingMemorySizeInBytes()
+                              * 
CONFIG.getPipeLeaderCacheMemoryUsagePercentage())))
+          .setExpandCallback(
+              (oldMemory, newMemory) -> {
+                device2endpoint
+                    .policy()
+                    .eviction()
+                    .ifPresent(eviction -> eviction.setMaximum(newMemory));
+                LOGGER.info(
+                    "LeaderCacheManager.allocatedMemoryBlock has expanded from 
{} to {}.",
+                    oldMemory,
+                    newMemory);
+              });
     }
 
     public TEndPoint getLeaderEndPoint(final String deviceId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 55848c47c01..642442cc1a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.Weigher;
-import com.google.common.util.concurrent.AtomicDouble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +43,6 @@ public class SubscriptionPollResponseCache {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionPollResponseCache.class);
 
-  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
-
   private final LoadingCache<CachedSubscriptionPollResponse, ByteBuffer> cache;
 
   public ByteBuffer serialize(final CachedSubscriptionPollResponse response) 
throws IOException {
@@ -113,41 +110,36 @@ public class SubscriptionPollResponseCache {
 
     // properties required by pipe memory control framework
     final PipeMemoryBlock allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(initMemorySizeInBytes)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
-            .setShrinkCallback(
-                (oldMemory, newMemory) -> {
-                  memoryUsageCheatFactor.updateAndGet(
-                      factor -> factor * ((double) oldMemory / newMemory));
-                  LOGGER.info(
-                      "SubscriptionEventBinaryCache.allocatedMemoryBlock has 
shrunk from {} to {}.",
-                      oldMemory,
-                      newMemory);
-                })
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
maxMemorySizeInBytes))
-            .setExpandCallback(
-                (oldMemory, newMemory) -> {
-                  memoryUsageCheatFactor.updateAndGet(
-                      factor -> factor / ((double) newMemory / oldMemory));
-                  LOGGER.info(
-                      "SubscriptionEventBinaryCache.allocatedMemoryBlock has 
expanded from {} to {}.",
-                      oldMemory,
-                      newMemory);
-                });
+        
PipeDataNodeResourceManager.memory().tryAllocate(initMemorySizeInBytes);
 
     this.cache =
         Caffeine.newBuilder()
             .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
             .weigher(
                 (Weigher<CachedSubscriptionPollResponse, ByteBuffer>)
-                    (message, buffer) -> {
-                      // TODO: overflow
-                      return (int) (buffer.capacity() * 
memoryUsageCheatFactor.get());
-                    })
+                    (message, buffer) -> buffer.capacity())
             .recordStats() // TODO: metrics
             // NOTE: lambda CAN NOT be replaced with method reference
-            .build(response -> 
CachedSubscriptionPollResponse.serialize(response));
+            .build(CachedSubscriptionPollResponse::serialize);
+
+    allocatedMemoryBlock
+        .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+        .setShrinkCallback(
+            (oldMemory, newMemory) -> {
+              cache.policy().eviction().ifPresent(eviction -> 
eviction.setMaximum(newMemory));
+              LOGGER.info(
+                  "SubscriptionEventBinaryCache.allocatedMemoryBlock has 
shrunk from {} to {}.",
+                  oldMemory,
+                  newMemory);
+            })
+        .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
maxMemorySizeInBytes))
+        .setExpandCallback(
+            (oldMemory, newMemory) -> {
+              cache.policy().eviction().ifPresent(eviction -> 
eviction.setMaximum(newMemory));
+              LOGGER.info(
+                  "SubscriptionEventBinaryCache.allocatedMemoryBlock has 
expanded from {} to {}.",
+                  oldMemory,
+                  newMemory);
+            });
   }
 }

Reply via email to