This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch optimize-pipe-after-stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2dd6c83342c6a646c8464da4a295c2655968fb55
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 10 12:21:08 2025 +0800

    optimize
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 49 +++++++++++++++++++++-
 .../PipeRealtimePriorityBlockingQueue.java         | 37 ++++++++++++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 23 +++++++---
 .../iotdb/commons/conf/CommonDescriptor.java       | 14 +++++--
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      | 13 ++++--
 6 files changed, 114 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 34d11913dff..741a2de6803 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -111,6 +111,49 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
   }
 
+  ////////////////////////// Manage by Pipe Name //////////////////////////
+
+  @Override
+  protected void startPipe(final String pipeName, final long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    final PipeStatus status = 
existedPipeMeta.getRuntimeMeta().getStatus().get();
+    if (PipeStatus.STOPPED.equals(status) || status == null) {
+      restartPipeToReloadResourceIfNeeded(existedPipeMeta);
+    }
+
+    super.startPipe(pipeName, creationTime);
+  }
+
+  private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) {
+    final AtomicLong lastRestartTime =
+        
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+    if (lastRestartTime != null
+        && System.currentTimeMillis() - lastRestartTime.get()
+            < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+      LOGGER.info(
+          "Skipping reload resource for stopped pipe {} before starting it 
because reloading resource is too frequent.",
+          pipeMeta.getStaticMeta().getPipeName());
+      return;
+    }
+
+    if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
+      LOGGER.info(
+          "Flushing storage engine before restarting pipe {}.",
+          pipeMeta.getStaticMeta().getPipeName());
+      final long currentTime = System.currentTimeMillis();
+      StorageEngine.getInstance().syncCloseAllProcessor();
+      WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
+      LOGGER.info(
+          "Finished flushing storage engine, time cost: {} ms.",
+          System.currentTimeMillis() - currentTime);
+    }
+
+    restartStuckPipe(pipeMeta);
+    LOGGER.info(
+        "Reloaded resource for stopped pipe {} before starting it.",
+        pipeMeta.getStaticMeta().getPipeName());
+  }
+
   ///////////////////////// Manage by regionGroupId /////////////////////////
 
   @Override
@@ -674,7 +717,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   private void restartStuckPipe(final PipeMeta pipeMeta) {
-    LOGGER.warn("Pipe {} will be restarted because of stuck.", 
pipeMeta.getStaticMeta());
+    LOGGER.warn(
+        "Pipe {} will be restarted because it is stuck or has encountered 
issues such as data backlog or being stopped for too long.",
+        pipeMeta.getStaticMeta());
     acquireWriteLock();
     try {
       final long startTime = System.currentTimeMillis();
@@ -688,7 +733,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       handleSinglePipeMetaChanges(originalPipeMeta);
 
       LOGGER.warn(
-          "Pipe {} was restarted because of stuck, time cost: {} ms.",
+          "Pipe {} was restarted because of stuck or data backlog, time cost: 
{} ms.",
           originalPipeMeta.getStaticMeta(),
           System.currentTimeMillis() - startTime);
     } catch (final Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index a170b19bd42..ece2b513ddc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -33,6 +33,7 @@ import java.util.Objects;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQueue<Event> {
@@ -42,8 +43,11 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
 
   private final AtomicInteger eventCount = new AtomicInteger(0);
 
-  private static final int pollHistoryThreshold =
-      PipeConfig.getInstance().getPipeRealTimeQueuePollHistoryThreshold();
+  private static final int POLL_TSFILE_THRESHOLD =
+      PipeConfig.getInstance().getPipeRealTimeQueuePollTsFileThreshold();
+  private static final int POLL_HISTORICAL_TSFILE_THRESHOLD =
+      
Math.max(PipeConfig.getInstance().getPipeRealTimeQueuePollHistoricalTsFileThreshold(),
 1);
+  private final AtomicLong pollHistoryCounter = new AtomicLong(0);
 
   public PipeRealtimePriorityBlockingQueue() {
     super(new PipeDataRegionEventCounter());
@@ -81,15 +85,21 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   @Override
   public Event directPoll() {
     Event event = null;
-    if (eventCount.get() >= pollHistoryThreshold) {
-      event = tsfileInsertEventDeque.pollFirst();
+    if (eventCount.get() >= POLL_TSFILE_THRESHOLD) {
+      event =
+          pollHistoryCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+              ? tsfileInsertEventDeque.pollFirst()
+              : tsfileInsertEventDeque.pollLast();
       eventCount.set(0);
     }
     if (Objects.isNull(event)) {
       // Sequentially poll the first offered non-TsFileInsertionEvent
       event = super.directPoll();
       if (Objects.isNull(event)) {
-        event = tsfileInsertEventDeque.pollFirst();
+        event =
+            pollHistoryCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+                ? tsfileInsertEventDeque.pollFirst()
+                : tsfileInsertEventDeque.pollLast();
       }
       if (event != null) {
         eventCount.incrementAndGet();
@@ -113,15 +123,21 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   @Override
   public Event waitedPoll() {
     Event event = null;
-    if (eventCount.get() >= pollHistoryThreshold) {
-      event = tsfileInsertEventDeque.pollFirst();
+    if (eventCount.get() >= POLL_TSFILE_THRESHOLD) {
+      event =
+          pollHistoryCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+              ? tsfileInsertEventDeque.pollFirst()
+              : tsfileInsertEventDeque.pollLast();
       eventCount.set(0);
     }
     if (event == null) {
       // Sequentially poll the first offered non-TsFileInsertionEvent
       event = super.directPoll();
       if (event == null && !tsfileInsertEventDeque.isEmpty()) {
-        event = tsfileInsertEventDeque.pollFirst();
+        event =
+            pollHistoryCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+                ? tsfileInsertEventDeque.pollFirst()
+                : tsfileInsertEventDeque.pollLast();
       }
       if (event != null) {
         eventCount.incrementAndGet();
@@ -132,7 +148,10 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     if (Objects.isNull(event)) {
       event = super.waitedPoll();
       if (Objects.isNull(event)) {
-        event = tsfileInsertEventDeque.pollFirst();
+        event =
+            pollHistoryCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+                ? tsfileInsertEventDeque.pollFirst()
+                : tsfileInsertEventDeque.pollLast();
       }
       if (event != null) {
         eventCount.incrementAndGet();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cb69d5db391..e3bf41954eb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -204,7 +204,8 @@ public class CommonConfig {
 
   private boolean pipeFileReceiverFsyncEnabled = true;
 
-  private int pipeRealTimeQueuePollHistoryThreshold = 1;
+  private int pipeRealTimeQueuePollTsFileThreshold = 10;
+  private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
@@ -259,7 +260,7 @@ public class CommonConfig {
 
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10;
-  private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
+  private int pipeMaxAllowedPinnedMemTableCount = 5; // per data region
   private long pipeMaxAllowedLinkedTsFileCount = 300;
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
@@ -990,12 +991,22 @@ public class CommonConfig {
     this.pipeSubtaskExecutorForcedRestartIntervalMs = 
pipeSubtaskExecutorForcedRestartIntervalMs;
   }
 
-  public int getPipeRealTimeQueuePollHistoryThreshold() {
-    return pipeRealTimeQueuePollHistoryThreshold;
+  public int getPipeRealTimeQueuePollTsFileThreshold() {
+    return pipeRealTimeQueuePollTsFileThreshold;
   }
 
-  public void setPipeRealTimeQueuePollHistoryThreshold(int 
pipeRealTimeQueuePollHistoryThreshold) {
-    this.pipeRealTimeQueuePollHistoryThreshold = 
pipeRealTimeQueuePollHistoryThreshold;
+  public void setPipeRealTimeQueuePollTsFileThreshold(int 
pipeRealTimeQueuePollTsFileThreshold) {
+    this.pipeRealTimeQueuePollTsFileThreshold = 
pipeRealTimeQueuePollTsFileThreshold;
+  }
+
+  public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
+    return pipeRealTimeQueuePollHistoricalTsFileThreshold;
+  }
+
+  public void setPipeRealTimeQueuePollHistoricalTsFileThreshold(
+      int pipeRealTimeQueuePollHistoricalTsFileThreshold) {
+    this.pipeRealTimeQueuePollHistoricalTsFileThreshold =
+        pipeRealTimeQueuePollHistoricalTsFileThreshold;
   }
 
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 2afe9bbdeb4..4b813672669 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -300,11 +300,19 @@ public class CommonDescriptor {
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
 
-    config.setPipeRealTimeQueuePollHistoryThreshold(
+    config.setPipeRealTimeQueuePollTsFileThreshold(
+        Integer.parseInt(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_realtime_queue_poll_history_threshold"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_realtime_queue_poll_tsfile_threshold",
+                        
String.valueOf(config.getPipeRealTimeQueuePollTsFileThreshold())))));
+    config.setPipeRealTimeQueuePollHistoricalTsFileThreshold(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_realtime_queue_poll_history_threshold",
-                
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
+                "pipe_realtime_queue_poll_historical_tsfile_threshold",
+                
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
 
     int pipeSubtaskExecutorMaxThreadNum =
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index addd9d1f0bf..41cf6df907d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -576,7 +576,7 @@ public abstract class PipeTaskAgent {
     return true;
   }
 
-  private void startPipe(final String pipeName, final long creationTime) {
+  protected void startPipe(final String pipeName, final long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (!checkBeforeStartPipe(existedPipeMeta, pipeName, creationTime)) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4492df7a5db..8cd4b4d78d5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -78,8 +78,12 @@ public class PipeConfig {
 
   /////////////////////////////// Subtask Connector 
///////////////////////////////
 
-  public int getPipeRealTimeQueuePollHistoryThreshold() {
-    return COMMON_CONFIG.getPipeRealTimeQueuePollHistoryThreshold();
+  public int getPipeRealTimeQueuePollTsFileThreshold() {
+    return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
+  }
+
+  public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
+    return COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
   }
 
   /////////////////////////////// Subtask Executor 
///////////////////////////////
@@ -373,7 +377,10 @@ public class PipeConfig {
         getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
 
     LOGGER.info(
-        "PipeRealTimeQueuePollHistoryThreshold: {}", 
getPipeRealTimeQueuePollHistoryThreshold());
+        "PipeRealTimeQueuePollTsFileThreshold: {}", 
getPipeRealTimeQueuePollTsFileThreshold());
+    LOGGER.info(
+        "PipeRealTimeQueuePollHistoricalTsFileThreshold: {}",
+        getPipeRealTimeQueuePollHistoricalTsFileThreshold());
 
     LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", 
getPipeSubtaskExecutorMaxThreadNum());
     LOGGER.info(

Reply via email to