This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch optimize-pipe-after-stop in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2dd6c83342c6a646c8464da4a295c2655968fb55 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Mar 10 12:21:08 2025 +0800 optimize --- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 49 +++++++++++++++++++++- .../PipeRealtimePriorityBlockingQueue.java | 37 ++++++++++++---- .../apache/iotdb/commons/conf/CommonConfig.java | 23 +++++++--- .../iotdb/commons/conf/CommonDescriptor.java | 14 +++++-- .../commons/pipe/agent/task/PipeTaskAgent.java | 2 +- .../iotdb/commons/pipe/config/PipeConfig.java | 13 ++++-- 6 files changed, 114 insertions(+), 24 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 34d11913dff..741a2de6803 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -111,6 +111,49 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build(); } + ////////////////////////// Manage by Pipe Name ////////////////////////// + + @Override + protected void startPipe(final String pipeName, final long creationTime) { + final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get(); + if (PipeStatus.STOPPED.equals(status) || status == null) { + restartPipeToReloadResourceIfNeeded(existedPipeMeta); + } + + super.startPipe(pipeName, creationTime); + } + + private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) { + final AtomicLong lastRestartTime = + PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName()); + if (lastRestartTime != null + && System.currentTimeMillis() - lastRestartTime.get() + < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) { + LOGGER.info( + "Skipping reload resource for stopped pipe {} before starting it because reloading resource is too frequent.", + pipeMeta.getStaticMeta().getPipeName()); + return; + } + + if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) { + LOGGER.info( + "Flushing storage engine before restarting pipe {}.", + pipeMeta.getStaticMeta().getPipeName()); + final long currentTime = System.currentTimeMillis(); + StorageEngine.getInstance().syncCloseAllProcessor(); + WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); + LOGGER.info( + "Finished flushing storage engine, time cost: {} ms.", + System.currentTimeMillis() - currentTime); + } + + restartStuckPipe(pipeMeta); + LOGGER.info( + "Reloaded resource for stopped pipe {} before starting it.", + pipeMeta.getStaticMeta().getPipeName()); + } + ///////////////////////// Manage by regionGroupId ///////////////////////// @Override @@ -674,7 +717,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } private void restartStuckPipe(final PipeMeta pipeMeta) { - LOGGER.warn("Pipe {} will be restarted because of stuck.", pipeMeta.getStaticMeta()); + LOGGER.warn( + "Pipe {} will be restarted because it is stuck or has encountered issues such as data backlog or being stopped for too long.", + pipeMeta.getStaticMeta()); acquireWriteLock(); try { final long startTime = System.currentTimeMillis(); @@ -688,7 +733,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { handleSinglePipeMetaChanges(originalPipeMeta); LOGGER.warn( - "Pipe {} was restarted because of stuck, time cost: {} ms.", + "Pipe {} was restarted because of stuck or data backlog, time cost: {} ms.", originalPipeMeta.getStaticMeta(), System.currentTimeMillis() - startTime); } catch (final Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java index a170b19bd42..ece2b513ddc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQueue<Event> { @@ -42,8 +43,11 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ private final AtomicInteger eventCount = new AtomicInteger(0); - private static final int pollHistoryThreshold = - PipeConfig.getInstance().getPipeRealTimeQueuePollHistoryThreshold(); + private static final int POLL_TSFILE_THRESHOLD = + PipeConfig.getInstance().getPipeRealTimeQueuePollTsFileThreshold(); + private static final int POLL_HISTORICAL_TSFILE_THRESHOLD = + Math.max(PipeConfig.getInstance().getPipeRealTimeQueuePollHistoricalTsFileThreshold(), 1); + private final AtomicLong pollHistoryCounter = new AtomicLong(0); public PipeRealtimePriorityBlockingQueue() { super(new PipeDataRegionEventCounter()); @@ -81,15 +85,21 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ @Override public Event directPoll() { Event event = null; - if (eventCount.get() >= pollHistoryThreshold) { - event = tsfileInsertEventDeque.pollFirst(); + if (eventCount.get() >= POLL_TSFILE_THRESHOLD) { + event = + pollHistoryCounter.incrementAndGet() % POLL_HISTORICAL_TSFILE_THRESHOLD == 0 + ? tsfileInsertEventDeque.pollFirst() + : tsfileInsertEventDeque.pollLast(); eventCount.set(0); } if (Objects.isNull(event)) { // Sequentially poll the first offered non-TsFileInsertionEvent event = super.directPoll(); if (Objects.isNull(event)) { - event = tsfileInsertEventDeque.pollFirst(); + event = + pollHistoryCounter.incrementAndGet() % POLL_HISTORICAL_TSFILE_THRESHOLD == 0 + ? tsfileInsertEventDeque.pollFirst() + : tsfileInsertEventDeque.pollLast(); } if (event != null) { eventCount.incrementAndGet(); @@ -113,15 +123,21 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ @Override public Event waitedPoll() { Event event = null; - if (eventCount.get() >= pollHistoryThreshold) { - event = tsfileInsertEventDeque.pollFirst(); + if (eventCount.get() >= POLL_TSFILE_THRESHOLD) { + event = + pollHistoryCounter.incrementAndGet() % POLL_HISTORICAL_TSFILE_THRESHOLD == 0 + ? tsfileInsertEventDeque.pollFirst() + : tsfileInsertEventDeque.pollLast(); eventCount.set(0); } if (event == null) { // Sequentially poll the first offered non-TsFileInsertionEvent event = super.directPoll(); if (event == null && !tsfileInsertEventDeque.isEmpty()) { - event = tsfileInsertEventDeque.pollFirst(); + event = + pollHistoryCounter.incrementAndGet() % POLL_HISTORICAL_TSFILE_THRESHOLD == 0 + ? tsfileInsertEventDeque.pollFirst() + : tsfileInsertEventDeque.pollLast(); } if (event != null) { eventCount.incrementAndGet(); @@ -132,7 +148,10 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ if (Objects.isNull(event)) { event = super.waitedPoll(); if (Objects.isNull(event)) { - event = tsfileInsertEventDeque.pollFirst(); + event = + pollHistoryCounter.incrementAndGet() % POLL_HISTORICAL_TSFILE_THRESHOLD == 0 + ? tsfileInsertEventDeque.pollFirst() + : tsfileInsertEventDeque.pollLast(); } if (event != null) { eventCount.incrementAndGet(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index cb69d5db391..e3bf41954eb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -204,7 +204,8 @@ public class CommonConfig { private boolean pipeFileReceiverFsyncEnabled = true; - private int pipeRealTimeQueuePollHistoryThreshold = 1; + private int pipeRealTimeQueuePollTsFileThreshold = 10; + private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3; /** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */ private int pipeSubtaskExecutorMaxThreadNum = @@ -259,7 +260,7 @@ public class CommonConfig { private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100; private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10; - private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region + private int pipeMaxAllowedPinnedMemTableCount = 5; // per data region private long pipeMaxAllowedLinkedTsFileCount = 300; private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F; private long pipeStuckRestartIntervalSeconds = 120; @@ -990,12 +991,22 @@ public class CommonConfig { this.pipeSubtaskExecutorForcedRestartIntervalMs = pipeSubtaskExecutorForcedRestartIntervalMs; } - public int getPipeRealTimeQueuePollHistoryThreshold() { - return pipeRealTimeQueuePollHistoryThreshold; + public int getPipeRealTimeQueuePollTsFileThreshold() { + return pipeRealTimeQueuePollTsFileThreshold; } - public void setPipeRealTimeQueuePollHistoryThreshold(int pipeRealTimeQueuePollHistoryThreshold) { - this.pipeRealTimeQueuePollHistoryThreshold = pipeRealTimeQueuePollHistoryThreshold; + public void setPipeRealTimeQueuePollTsFileThreshold(int pipeRealTimeQueuePollTsFileThreshold) { + this.pipeRealTimeQueuePollTsFileThreshold = pipeRealTimeQueuePollTsFileThreshold; + } + + public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() { + return pipeRealTimeQueuePollHistoricalTsFileThreshold; + } + + public void setPipeRealTimeQueuePollHistoricalTsFileThreshold( + int pipeRealTimeQueuePollHistoricalTsFileThreshold) { + this.pipeRealTimeQueuePollHistoricalTsFileThreshold = + pipeRealTimeQueuePollHistoricalTsFileThreshold; } public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 2afe9bbdeb4..4b813672669 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -300,11 +300,19 @@ public class CommonDescriptor { String.valueOf( config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold())))); - config.setPipeRealTimeQueuePollHistoryThreshold( + config.setPipeRealTimeQueuePollTsFileThreshold( + Integer.parseInt( + Optional.ofNullable( + properties.getProperty("pipe_realtime_queue_poll_history_threshold")) + .orElse( + properties.getProperty( + "pipe_realtime_queue_poll_tsfile_threshold", + String.valueOf(config.getPipeRealTimeQueuePollTsFileThreshold()))))); + config.setPipeRealTimeQueuePollHistoricalTsFileThreshold( Integer.parseInt( properties.getProperty( - "pipe_realtime_queue_poll_history_threshold", - Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold())))); + "pipe_realtime_queue_poll_historical_tsfile_threshold", + String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold())))); int pipeSubtaskExecutorMaxThreadNum = Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index addd9d1f0bf..41cf6df907d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -576,7 +576,7 @@ public abstract class PipeTaskAgent { return true; } - private void startPipe(final String pipeName, final long creationTime) { + protected void startPipe(final String pipeName, final long creationTime) { final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); if (!checkBeforeStartPipe(existedPipeMeta, pipeName, creationTime)) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 4492df7a5db..8cd4b4d78d5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -78,8 +78,12 @@ public class PipeConfig { /////////////////////////////// Subtask Connector /////////////////////////////// - public int getPipeRealTimeQueuePollHistoryThreshold() { - return COMMON_CONFIG.getPipeRealTimeQueuePollHistoryThreshold(); + public int getPipeRealTimeQueuePollTsFileThreshold() { + return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold(); + } + + public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() { + return COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold(); } /////////////////////////////// Subtask Executor /////////////////////////////// @@ -373,7 +377,10 @@ public class PipeConfig { getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()); LOGGER.info( - "PipeRealTimeQueuePollHistoryThreshold: {}", getPipeRealTimeQueuePollHistoryThreshold()); + "PipeRealTimeQueuePollTsFileThreshold: {}", getPipeRealTimeQueuePollTsFileThreshold()); + LOGGER.info( + "PipeRealTimeQueuePollHistoricalTsFileThreshold: {}", + getPipeRealTimeQueuePollHistoricalTsFileThreshold()); LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", getPipeSubtaskExecutorMaxThreadNum()); LOGGER.info(
