This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 977086e33bfd3e3a4fad5140e19eb2458e55a45a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 10 12:07:50 2025 +0800

    [To dev/1.3] Pipe: Enabled waiting for pipes to finish & progress index 
persist to config node in shutdown hook (#15896)(#15901)
    
    * Pipe: Enabled waiting for pipes to finish & progress index persist to 
config node in shutdown hook (#15896)
    
    * persist in shutdown hook
    
    * Update PipeTaskAgent.java
    
    * Update PipeConfigNodeTaskAgent.java
    
    * Update DataNodeShutdownHook.java
    
    * Fix
    
    * Fix2
    
    * Fix3
    
    * Update PipeHeartbeatScheduler.java
    
    * Update ShowPipeTask.java
---
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |  2 +-
 .../handlers/rpc/PipeHeartbeatRPCHandler.java      |  2 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 18 ++++++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 ++
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   |  2 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  2 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 ++++
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 41 ++++++++++++++++------
 .../subtask/processor/PipeProcessorSubtask.java    |  4 +--
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  6 ++--
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  6 ++--
 .../common/tablet/PipeRawTabletInsertionEvent.java |  6 ++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  6 ++--
 .../dataregion/IoTDBDataRegionExtractor.java       |  4 +--
 .../PipeRealtimeDataRegionHybridExtractor.java     |  5 ++-
 .../schemaregion/IoTDBSchemaRegionExtractor.java   |  4 +--
 .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java  |  6 ++--
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 18 +++++++++-
 ...ics.java => PipeDataNodeSinglePipeMetrics.java} | 15 ++++----
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  8 +++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |  4 +--
 .../iotdb/db/service/DataNodeShutdownHook.java     | 37 +++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 14 ++++++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  5 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  5 +++
 .../thrift-commons/src/main/thrift/common.thrift   |  7 ++++
 .../src/main/thrift/confignode.thrift              |  3 ++
 .../src/main/thrift/datanode.thrift                |  9 +----
 30 files changed, 193 insertions(+), 59 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 6e2a9dc97cc..69021eaec1a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.client.async.handlers.rpc;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
@@ -32,7 +33,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
index 569424afbff..e5fa157961d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -20,8 +20,8 @@
 package org.apache.iotdb.confignode.client.async.handlers.rpc;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 0a207ab7238..8bb92f9d2bb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
@@ -2562,6 +2563,23 @@ public class ConfigManager implements IManager {
         : new TThrottleQuotaResp(status);
   }
 
+  @Override
+  public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp 
resp) {
+    final TSStatus status = confirmLeader();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return status;
+    }
+    pipeManager
+        .getPipeRuntimeCoordinator()
+        .parseHeartbeat(
+            dataNodeId,
+            resp.getPipeMetaList(),
+            resp.getPipeCompletedList(),
+            resp.getPipeRemainingEventCountList(),
+            resp.getPipeRemainingTimeList());
+    return StatusUtils.OK;
+  }
+
   @Override
   public DataSet registerAINode(TAINodeRegisterReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 657f068e674..88731538be2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
@@ -823,4 +824,6 @@ public interface IManager {
 
   /** Set space quota. */
   TSStatus setSpaceQuota(TSetSpaceQuotaReq req);
+
+  TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp);
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 0e90174a7f2..b115ba25b4a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.manager.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -35,7 +36,6 @@ import 
org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeConfigNodeRe
 import 
org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics;
 import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 3533b40158d..120ddb65f86 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -31,7 +32,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index b8ec6f773bd..21dbccd07a7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
@@ -1298,4 +1299,9 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   public TThrottleQuotaResp getThrottleQuota() {
     return configManager.getThrottleQuota();
   }
+
+  @Override
+  public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp 
resp) {
+    return configManager.pushHeartbeat(dataNodeId, resp);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ef0d4c6b4d9..1513d8ebdeb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -48,11 +50,14 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -63,10 +68,10 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.SystemMetric;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
@@ -306,13 +311,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   @Override
   protected void thawRate(final String pipeName, final long creationTime) {
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + 
"_" + creationTime);
+    PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + 
creationTime);
   }
 
   @Override
   protected void freezeRate(final String pipeName, final long creationTime) {
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-        .freezeRate(pipeName + "_" + creationTime);
+    PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + 
creationTime);
   }
 
   @Override
@@ -323,7 +327,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     final String taskId = pipeName + "_" + creationTime;
     PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+    PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
 
     return true;
   }
@@ -351,7 +355,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
       final String taskId = pipeName + "_" + creationTime;
       PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
-      
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+      PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
       // When the pipe contains no pipe tasks, there is no corresponding 
prefetching queue for the
       // subscribed pipe, so the subscription needs to be manually marked as 
completed.
       if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
@@ -445,7 +449,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
         pipeCompletedList.add(isCompleted);
         pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -475,7 +479,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   protected void collectPipeMetaListInternal(
       final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
-    if (PipeDataNodeAgent.runtime().isShutdown()) {
+    if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != 
Long.MIN_VALUE) {
       return;
     }
     LOGGER.info("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
@@ -528,7 +532,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
         pipeCompletedList.add(isCompleted);
         pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -817,6 +821,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
+  public void persistAllProgressIndex() {
+    try (final ConfigNodeClient configNodeClient =
+        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+      // Send request to some API server
+      final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+      collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+      final TSStatus result =
+          configNodeClient.pushHeartbeat(
+              IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+        LOGGER.warn("Failed to persist progress index to configNode, status: 
{}", result);
+      }
+    } catch (final Exception e) {
+      LOGGER.warn(e.getMessage());
+    }
+  }
+
   ///////////////////////// Pipe Consensus /////////////////////////
 
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index c8a0acf32fa..d39feb96fe0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -140,7 +140,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
         } else if (event instanceof TsFileInsertionEvent) {
           pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
           PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
-          PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+          PipeDataNodeSinglePipeMetrics.getInstance()
               .markTsFileCollectInvocationCount(
                   pipeNameWithCreationTime, 
outputEventCollector.getCollectInvocationCount());
         } else if (event instanceof PipeHeartbeatEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 19c3b0ca996..ad29b285442 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -25,7 +25,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -83,7 +83,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .increaseHeartbeatEventCount(pipeName, creationTime);
     }
     return true;
@@ -94,7 +94,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     // PipeName == null indicates that the event is the raw event at disruptor,
     // not the event copied and passed to the extractor
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .decreaseHeartbeatEventCount(pipeName, creationTime);
       if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
         LOGGER.debug(this.toString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index e7814916e5d..80ac86310b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
@@ -160,7 +160,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     try {
       PipeDataNodeResourceManager.wal().pin(walEntryHandler);
       if (Objects.nonNull(pipeName)) {
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .increaseInsertNodeEventCount(pipeName, creationTime);
         PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
       }
@@ -196,7 +196,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     } finally {
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .decreaseInsertNodeEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 95a532708e0..9f3ad7a1bdf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -28,7 +28,7 @@ import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.Pi
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
@@ -130,7 +130,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
             allocatedMemoryBlock,
             PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .increaseRawTabletEventCount(pipeName, creationTime);
     }
     return true;
@@ -139,7 +139,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .decreaseRawTabletEventCount(pipeName, creationTime);
     }
     allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 3aa9b4a6b13..41931131eb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -32,7 +32,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -266,7 +266,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
       return false;
     } finally {
       if (Objects.nonNull(pipeName)) {
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .increaseTsFileEventCount(pipeName, creationTime);
       }
     }
@@ -290,7 +290,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
       return false;
     } finally {
       if (Objects.nonNull(pipeName)) {
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .decreaseTsFileEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 808df12dfac..aa2f849f56f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -34,7 +34,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -358,7 +358,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     // register metric after generating taskID
     PipeDataRegionExtractorMetrics.getInstance().register(this);
     PipeTsFileToTabletsMetrics.getInstance().register(this);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+    PipeDataNodeSinglePipeMetrics.getInstance().register(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index c9d16163fc8..500a195e600 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -31,7 +31,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -239,8 +239,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
   private boolean mayRemainingInsertNodeEventExceedLimit(final 
PipeRealtimeEvent event) {
     final boolean mayRemainingInsertEventExceedLimit =
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .mayRemainingInsertEventExceedLimit(pipeID);
+        
PipeDataNodeSinglePipeMetrics.getInstance().mayRemainingInsertEventExceedLimit(pipeID);
     if (mayRemainingInsertEventExceedLimit && 
event.mayExtractorUseTablets(this)) {
       logByLogManager(
           l ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
index b1153071bab..92fc23775e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
@@ -32,7 +32,7 @@ import 
org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionExtractorMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -75,7 +75,7 @@ public class IoTDBSchemaRegionExtractor extends 
IoTDBNonDataRegionExtractor {
     listenedTypeSet = 
SchemaRegionListeningFilter.parseListeningPlanTypeSet(parameters);
 
     PipeSchemaRegionExtractorMetrics.getInstance().register(this);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+    PipeDataNodeSinglePipeMetrics.getInstance().register(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 3f03ce580fd..dd3873feaf4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.metric;
 
 import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
@@ -53,7 +53,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionListenerMetrics.getInstance().bindTo(metricService);
     PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService);
     PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
-    
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
+    PipeDataNodeSinglePipeMetrics.getInstance().bindTo(metricService);
     PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
     PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService);
   }
@@ -71,7 +71,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionListenerMetrics.getInstance().unbindFrom(metricService);
     PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService);
     PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
-    
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
+    PipeDataNodeSinglePipeMetrics.getInstance().unbindFrom(metricService);
     PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
     PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 86368acf353..73d58285345 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
+public class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
 
   // Calculate from schema region extractors directly for it requires less 
computation
   private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
@@ -107,6 +107,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     return insertNodeEventCountEMA.insertNodeEMAValue;
   }
 
+  public long getRemainingNonHeartbeatEvents() {
+    final long remainingEvents =
+        tsfileEventCount.get()
+            + rawTabletEventCount.get()
+            + insertNodeEventCount.get()
+            + schemaRegionExtractors.stream()
+                .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
+                .reduce(Long::sum)
+                .orElse(0L);
+
+    // There are cases where the indicator is negative. For example, after the 
Pipe is restarted,
+    // the Processor SubTask is still collecting Events, resulting in a 
negative count. This
+    // situation cannot be avoided because the Pipe may be restarted 
internally.
+    return remainingEvents >= 0 ? remainingEvents : 0;
+  }
+
   long getRemainingEvents() {
     final long remainingEvents =
         tsfileEventCount.get()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 354a980edfd..677d758a162 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -42,15 +42,14 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
+public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeDataNodeRemainingEventAndTimeMetrics.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeSinglePipeMetrics.class);
 
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
+  public final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
       remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>();
 
   private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
@@ -381,19 +380,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
 
   private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
 
-    private static final PipeDataNodeRemainingEventAndTimeMetrics INSTANCE =
-        new PipeDataNodeRemainingEventAndTimeMetrics();
+    private static final PipeDataNodeSinglePipeMetrics INSTANCE =
+        new PipeDataNodeSinglePipeMetrics();
 
     private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
       // Empty constructor
     }
   }
 
-  public static PipeDataNodeRemainingEventAndTimeMetrics getInstance() {
+  public static PipeDataNodeSinglePipeMetrics getInstance() {
     return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
   }
 
-  private PipeDataNodeRemainingEventAndTimeMetrics() {
+  private PipeDataNodeSinglePipeMetrics() {
     
PipeEventCommitManager.getInstance().setCommitRateMarker(this::markRegionCommit);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 979c18320ca..2825eefaf4c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
@@ -1271,6 +1272,13 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.getThrottleQuota(), resp -> 
!updateConfigNodeLeader(resp.status));
   }
 
+  @Override
+  public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp 
resp)
+      throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.pushHeartbeat(dataNodeId, resp), status -> 
!updateConfigNodeLeader(status));
+  }
+
   public static class Factory extends ThriftClientFactory<ConfigRegionId, 
ConfigNodeClient> {
 
     public Factory(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9eb5446f0cb..ec1c3686f24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSender;
 import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -211,7 +212,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import 
org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 687a438f9b3..16ee005a089 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -97,7 +97,7 @@ public class ShowPipeTask implements IConfigTask {
 
       if (remainingEventCount == -1 && remainingTime == -1) {
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(tPipeInfo.getId(), 
tPipeInfo.getCreationTime());
         remainingEventCount = remainingEventAndTime.getLeft();
         remainingTime = remainingEventAndTime.getRight();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 13b87f5bb83..8dd3d39c92c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -25,10 +25,14 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -42,6 +46,8 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class DataNodeShutdownHook extends Thread {
 
   private static final Logger logger = 
LoggerFactory.getLogger(DataNodeShutdownHook.class);
@@ -86,6 +92,37 @@ public class DataNodeShutdownHook extends Thread {
       triggerSnapshotForAllDataRegion();
     }
 
+    long startTime = System.currentTimeMillis();
+    if (PipeDataNodeAgent.task().getPipeCount() != 0) {
+      for (Map.Entry<String, PipeDataNodeRemainingEventAndTimeOperator> entry :
+          
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet())
 {
+        boolean timeout = false;
+        while (true) {
+          if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
+            logger.info(
+                "Successfully waited for pipe {} to finish.", 
entry.getValue().getPipeName());
+            break;
+          }
+          if (System.currentTimeMillis() - startTime
+              > PipeConfig.getInstance().getPipeMaxWaitFinishTime()) {
+            timeout = true;
+            break;
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.info("Interrupted when waiting for pipe to finish");
+          }
+        }
+        if (timeout) {
+          logger.info("Timed out when waiting for pipes to finish, will 
break");
+          break;
+        }
+      }
+    }
+    // Persist progress index before shutdown to accurate recovery after 
restart
+    PipeDataNodeAgent.task().persistAllProgressIndex();
     // Shutdown pipe progressIndex background service
     PipePeriodicalJobExecutor.shutdownBackgroundService();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 038581bf1f5..97312b61400 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -232,6 +232,8 @@ public class CommonConfig {
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
   private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
 
+  private long pipeMaxWaitFinishTime = 10 * 1000;
+
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; 
// 50B
   private int pipeExtractorMatcherCacheSize = 1024;
@@ -1380,6 +1382,18 @@ public class CommonConfig {
         pipeSubtaskExecutorForcedRestartIntervalMs);
   }
 
+  public long getPipeMaxWaitFinishTime() {
+    return pipeMaxWaitFinishTime;
+  }
+
+  public void setPipeMaxWaitFinishTime(long pipeMaxWaitFinishTime) {
+    if (this.pipeMaxWaitFinishTime == pipeMaxWaitFinishTime) {
+      return;
+    }
+    this.pipeMaxWaitFinishTime = pipeMaxWaitFinishTime;
+    logger.info("pipeMaxWaitFinishTime is set to {}.", pipeMaxWaitFinishTime);
+  }
+
   public int getPipeRealTimeQueuePollTsFileThreshold() {
     return pipeRealTimeQueuePollTsFileThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 353da8c84e4..a88232161c8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
@@ -35,7 +36,6 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index be0c70d7f42..94a65c01cf0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -147,6 +147,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
   }
 
+  public long getPipeMaxWaitFinishTime() {
+    return COMMON_CONFIG.getPipeMaxWaitFinishTime();
+  }
+
   /////////////////////////////// Extractor ///////////////////////////////
 
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {
@@ -515,6 +519,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
         getPipeSubtaskExecutorForcedRestartIntervalMs());
+    LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
 
     LOGGER.info(
         "PipeExtractorAssignerDisruptorRingBufferSize: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 7086ff731bf..8bc3c384b01 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -591,6 +591,11 @@ public class PipeDescriptor {
                 "pipe_threshold_allocation_strategy_high_usage_threshold",
                 String.valueOf(
                     
config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()))));
+
+    config.setPipeMaxWaitFinishTime(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_max_wait_finish_time", 
String.valueOf(config.getPipeMaxWaitFinishTime()))));
   }
 
   public static void loadPipeExternalConfig(
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index aed1b5f88f5..73aef5c4c59 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -196,6 +196,13 @@ struct TSetThrottleQuotaReq {
   2: required TThrottleQuota throttleQuota
 }
 
+struct TPipeHeartbeatResp {
+  1: required list<binary> pipeMetaList
+  2: optional list<bool> pipeCompletedList
+  3: optional list<i64> pipeRemainingEventCountList
+  4: optional list<double> pipeRemainingTimeList
+}
+
 struct TLicense {
     1: required i64 licenseIssueTimestamp
     2: required i64 expireTimestamp
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 4642f3e20af..19e8e2a7d1f 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -1771,5 +1771,8 @@ service IConfigNodeRPCService {
 
   /** Get throttle quota information */
   TThrottleQuotaResp getThrottleQuota()
+
+  /** Push heartbeat in shutdown */
+  common.TSStatus pushHeartbeat(i32 dataNodeId, common.TPipeHeartbeatResp resp)
 }
 
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index d58e1eb1591..f7a9c4c3854 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -313,13 +313,6 @@ struct TPipeHeartbeatReq {
   1: required i64 heartbeatId
 }
 
-struct TPipeHeartbeatResp {
-  1: required list<binary> pipeMetaList
-  2: optional list<bool> pipeCompletedList
-  3: optional list<i64> pipeRemainingEventCountList
-  4: optional list<double> pipeRemainingTimeList
-}
-
 enum TSchemaLimitLevel{
     DEVICE,
     TIMESERIES
@@ -1041,7 +1034,7 @@ service IDataNodeRPCService {
   /**
   * ConfigNode will ask DataNode for pipe meta in every few seconds
   **/
-  TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
+  common.TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
 
  /**
   * Execute CQ on DataNode

Reply via email to