This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d39619f828 Pipe: Use fixed memory settings to configure batch (#16134)
7d39619f828 is described below
commit 7d39619f82889fd553c30ac35a0dae35a03ea930
Author: Caideyipi <[email protected]>
AuthorDate: Mon Aug 11 09:47:07 2025 +0800
Pipe: Use fixed memory settings to configure batch (#16134)
* test
* test
* fix
* fix
---
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 2 -
.../evolvable/batch/PipeTabletEventBatch.java | 69 ++--------------------
2 files changed, 5 insertions(+), 66 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 9be97ea6e17..1b56a00b325 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -31,7 +31,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -160,7 +159,6 @@ class PipeAgentLauncher {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
final TGetAllPipeInfoResp getAllPipeInfoResp =
configNodeClient.getAllPipeInfo();
- PipeTabletEventBatch.init();
if (getAllPipeInfoResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StartupException("Failed to get pipe task meta from config
node.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index f66fb32cf5e..0e13feb8ac4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -20,10 +20,6 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
-import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
@@ -40,7 +36,7 @@ import java.util.Objects;
public abstract class PipeTabletEventBatch implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
- private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
+ private final long maxBatchSizeInBytes;
protected final List<EnrichedEvent> events = new ArrayList<>();
protected final TriLongConsumer recordMetric;
@@ -49,7 +45,6 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
private long firstEventProcessingTime = Long.MIN_VALUE;
protected long totalBufferSize = 0;
- private final PipeDynamicMemoryBlock allocatedMemoryBlock;
protected volatile boolean isClosed = false;
@@ -57,11 +52,10 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
final TriLongConsumer recordMetric) {
- if (pipeModelFixedMemoryBlock == null) {
- init();
- }
-
this.maxDelayInMs = maxDelayInMs;
+
+ // limit in buffer size
+ this.maxBatchSizeInBytes = requestMaxBatchSizeInBytes;
if (recordMetric != null) {
this.recordMetric = recordMetric;
} else {
@@ -70,18 +64,6 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
// do nothing
};
}
-
- // limit in buffer size
- this.allocatedMemoryBlock =
-
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
-
- if (getMaxBatchSizeInBytes() !=
allocatedMemoryBlock.getMemoryUsageInBytes()) {
- LOGGER.info(
- "PipeTabletEventBatch: the max batch size is adjusted from {} to {}
due to the "
- + "memory restriction",
- requestMaxBatchSizeInBytes,
- getMaxBatchSizeInBytes());
- }
}
/**
@@ -139,18 +121,13 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
public boolean shouldEmit() {
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
- if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
- allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double)
diff / maxDelayInMs);
+ if (totalBufferSize >= maxBatchSizeInBytes || diff >= maxDelayInMs) {
recordMetric.accept(diff, totalBufferSize, events.size());
return true;
}
return false;
}
- private long getMaxBatchSizeInBytes() {
- return allocatedMemoryBlock.getMemoryUsageInBytes();
- }
-
public synchronized void onSuccess() {
events.clear();
@@ -165,10 +142,6 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
events.clear();
-
- if (allocatedMemoryBlock != null) {
- allocatedMemoryBlock.close();
- }
}
/**
@@ -203,38 +176,6 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
return events.isEmpty();
}
- // please at PipeLauncher call this method to init pipe model fixed memory
block
- public static void init() {
- if (pipeModelFixedMemoryBlock != null) {
- return;
- }
-
- try {
- long batchSize =
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch();
- for (long i = batchSize; i > 0; i = i / 2) {
- try {
- pipeModelFixedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(i,
PipeMemoryBlockType.BATCH);
-
- LOGGER.info("pipe model fixed memory block initialized with size: {}
bytes", i);
- return;
- } catch (Exception ignore) {
- // ignore the exception and try to allocate a smaller size
- LOGGER.info(
- "pipe model fixed memory block initialized with size: {} bytes
failed, try smaller size",
- i);
- }
- }
- } catch (Exception e) {
- LOGGER.error("init pipe model fixed memory block failed", e);
- // If the allocation fails, we still need to create a default memory
block to avoid NPE.
- pipeModelFixedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(0,
PipeMemoryBlockType.BATCH);
- }
- }
-
@FunctionalInterface
public interface TriLongConsumer {
void accept(long l1, long l2, long l3);