This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch optimize-pipe-after-stop in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 22ed3442c0c97eb9cc8f369071eb362049c5dd75 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Mar 10 12:36:37 2025 +0800 op --- .../realtime/PipeRealtimeDataRegionHybridExtractor.java | 7 +++++++ .../storageengine/dataregion/wal/utils/WALInsertNodeCache.java | 7 +++++-- .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 10 ++++++++++ .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++ .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java | 7 +++++++ 5 files changed, 34 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 26cc157e7ee..b1baca7c42a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -48,6 +48,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class); + private final boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled = + PipeConfig.getInstance().isPipeEpochKeepTsFileAfterStuckRestartEnabled(); + @Override protected void doExtract(final PipeRealtimeEvent event) { final Event eventToExtract = event.getEvent(); @@ -223,6 +226,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) { + if (!isPipeEpochKeepTsFileAfterStuckRestartEnabled) { + return false; + } + final boolean isPipeTaskCurrentlyRestarted = PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName); if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java index 76857c04983..0541ff96188 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.utils; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -58,6 +59,7 @@ public class WALInsertNodeCache { private static final Logger LOGGER = LoggerFactory.getLogger(WALInsertNodeCache.class); private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); private final PipeMemoryBlock allocatedMemoryBlock; // Used to adjust the memory usage of the cache @@ -75,8 +77,9 @@ public class WALInsertNodeCache { final long requestedAllocateSize = (long) Math.min( - (double) 2 * CONFIG.getWalFileSizeThresholdInByte(), - CONFIG.getAllocateMemoryForPipe() * 0.8 / 5); + (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount() + * CONFIG.getWalFileSizeThresholdInByte(), + CONFIG.getAllocateMemoryForPipe() * 0.45); allocatedMemoryBlock = PipeDataNodeResourceManager.memory() .tryAllocate(requestedAllocateSize) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index e3bf41954eb..340e7e26462 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -265,6 +265,7 @@ public class CommonConfig { private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F; private long pipeStuckRestartIntervalSeconds = 120; private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes + private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false; private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE; private int pipeMetaReportMaxLogNumPerRound = 10; @@ -1088,6 +1089,10 @@ public class CommonConfig { return pipeStuckRestartMinIntervalMs; } + public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() { + return pipeEpochKeepTsFileAfterStuckRestartEnabled; + } + public long getPipeStorageEngineFlushTimeIntervalMs() { return pipeStorageEngineFlushTimeIntervalMs; } @@ -1100,6 +1105,11 @@ public class CommonConfig { this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs; } + public void setPipeEpochKeepTsFileAfterStuckRestartEnabled( + boolean pipeEpochKeepTsFileAfterStuckRestartEnabled) { + this.pipeEpochKeepTsFileAfterStuckRestartEnabled = pipeEpochKeepTsFileAfterStuckRestartEnabled; + } + public void setPipeStorageEngineFlushTimeIntervalMs(long pipeStorageEngineFlushTimeIntervalMs) { this.pipeStorageEngineFlushTimeIntervalMs = pipeStorageEngineFlushTimeIntervalMs; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 4b813672669..10b6978149c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -540,6 +540,11 @@ public class CommonDescriptor { properties.getProperty( "pipe_stuck_restart_min_interval_ms", String.valueOf(config.getPipeStuckRestartMinIntervalMs())))); + config.setPipeEpochKeepTsFileAfterStuckRestartEnabled( + Boolean.parseBoolean( + properties.getProperty( + "pipe_epoch_keep_tsfile_after_stuck_restart_enabled", + String.valueOf(config.isPipeEpochKeepTsFileAfterStuckRestartEnabled())))); config.setPipeStorageEngineFlushTimeIntervalMs( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 8cd4b4d78d5..c0cc1a4d9fb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -268,6 +268,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs(); } + public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() { + return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled(); + } + public long getPipeStorageEngineFlushTimeIntervalMs() { return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs(); } @@ -475,6 +479,9 @@ public class PipeConfig { getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()); LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds()); LOGGER.info("PipeStuckRestartMinIntervalMs: {}", getPipeStuckRestartMinIntervalMs()); + LOGGER.info( + "PipeEpochKeepTsFileAfterStuckRestartEnabled: {}", + isPipeEpochKeepTsFileAfterStuckRestartEnabled()); LOGGER.info( "PipeStorageEngineFlushTimeIntervalMs: {}", getPipeStorageEngineFlushTimeIntervalMs());
