This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d39619f828 Pipe: Use fixed memory settings to configure batch (#16134)
7d39619f828 is described below

commit 7d39619f82889fd553c30ac35a0dae35a03ea930
Author: Caideyipi <[email protected]>
AuthorDate: Mon Aug 11 09:47:07 2025 +0800

    Pipe: Use fixed memory settings to configure batch (#16134)
    
    * test
    
    * test
    
    * fix
    
    * fix
---
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |  2 -
 .../evolvable/batch/PipeTabletEventBatch.java      | 69 ++--------------------
 2 files changed, 5 insertions(+), 66 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 9be97ea6e17..1b56a00b325 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -160,7 +159,6 @@ class PipeAgentLauncher {
     try (final ConfigNodeClient configNodeClient =
         
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
       final TGetAllPipeInfoResp getAllPipeInfoResp = 
configNodeClient.getAllPipeInfo();
-      PipeTabletEventBatch.init();
       if (getAllPipeInfoResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new StartupException("Failed to get pipe task meta from config 
node.");
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index f66fb32cf5e..0e13feb8ac4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -20,10 +20,6 @@
 package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
-import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -40,7 +36,7 @@ import java.util.Objects;
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
-  private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
+  private final long maxBatchSizeInBytes;
 
   protected final List<EnrichedEvent> events = new ArrayList<>();
   protected final TriLongConsumer recordMetric;
@@ -49,7 +45,6 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   private long firstEventProcessingTime = Long.MIN_VALUE;
 
   protected long totalBufferSize = 0;
-  private final PipeDynamicMemoryBlock allocatedMemoryBlock;
 
   protected volatile boolean isClosed = false;
 
@@ -57,11 +52,10 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
       final int maxDelayInMs,
       final long requestMaxBatchSizeInBytes,
       final TriLongConsumer recordMetric) {
-    if (pipeModelFixedMemoryBlock == null) {
-      init();
-    }
-
     this.maxDelayInMs = maxDelayInMs;
+
+    // limit in buffer size
+    this.maxBatchSizeInBytes = requestMaxBatchSizeInBytes;
     if (recordMetric != null) {
       this.recordMetric = recordMetric;
     } else {
@@ -70,18 +64,6 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
             // do nothing
           };
     }
-
-    // limit in buffer size
-    this.allocatedMemoryBlock =
-        
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
-
-    if (getMaxBatchSizeInBytes() != 
allocatedMemoryBlock.getMemoryUsageInBytes()) {
-      LOGGER.info(
-          "PipeTabletEventBatch: the max batch size is adjusted from {} to {} 
due to the "
-              + "memory restriction",
-          requestMaxBatchSizeInBytes,
-          getMaxBatchSizeInBytes());
-    }
   }
 
   /**
@@ -139,18 +121,13 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
   public boolean shouldEmit() {
     final long diff = System.currentTimeMillis() - firstEventProcessingTime;
-    if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
-      allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) 
diff / maxDelayInMs);
+    if (totalBufferSize >= maxBatchSizeInBytes || diff >= maxDelayInMs) {
       recordMetric.accept(diff, totalBufferSize, events.size());
       return true;
     }
     return false;
   }
 
-  private long getMaxBatchSizeInBytes() {
-    return allocatedMemoryBlock.getMemoryUsageInBytes();
-  }
-
   public synchronized void onSuccess() {
     events.clear();
 
@@ -165,10 +142,6 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
     clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
     events.clear();
-
-    if (allocatedMemoryBlock != null) {
-      allocatedMemoryBlock.close();
-    }
   }
 
   /**
@@ -203,38 +176,6 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     return events.isEmpty();
   }
 
-  // please at PipeLauncher call this method to init pipe model fixed memory 
block
-  public static void init() {
-    if (pipeModelFixedMemoryBlock != null) {
-      return;
-    }
-
-    try {
-      long batchSize = 
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch();
-      for (long i = batchSize; i > 0; i = i / 2) {
-        try {
-          pipeModelFixedMemoryBlock =
-              PipeDataNodeResourceManager.memory()
-                  .forceAllocateForModelFixedMemoryBlock(i, 
PipeMemoryBlockType.BATCH);
-
-          LOGGER.info("pipe model fixed memory block initialized with size: {} 
bytes", i);
-          return;
-        } catch (Exception ignore) {
-          // ignore the exception and try to allocate a smaller size
-          LOGGER.info(
-              "pipe model fixed memory block initialized with size: {} bytes 
failed, try smaller size",
-              i);
-        }
-      }
-    } catch (Exception e) {
-      LOGGER.error("init pipe model fixed memory block failed", e);
-      // If the allocation fails, we still need to create a default memory 
block to avoid NPE.
-      pipeModelFixedMemoryBlock =
-          PipeDataNodeResourceManager.memory()
-              .forceAllocateForModelFixedMemoryBlock(0, 
PipeMemoryBlockType.BATCH);
-    }
-  }
-
   @FunctionalInterface
   public interface TriLongConsumer {
     void accept(long l1, long l2, long l3);

Reply via email to