This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a67740018a [IoTV2] Add ConfigNode-side consensus pipe guardian and 
remove deprecated DataNode-side checking (#17277)
9a67740018a is described below

commit 9a67740018af8eb2b2f69fce23c0634903e76921
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Mar 12 05:12:12 2026 -0500

    [IoTV2] Add ConfigNode-side consensus pipe guardian and remove deprecated 
DataNode-side checking (#17277)
    
    * add consensus pipe check to config node
    
    * add ut for consensus pipe checker
    
    * remove datanode deprecated code
    
    * fix review
---
 .../pipe/coordinator/runtime/PipeMetaSyncer.java   |  20 ++
 .../pipe/coordinator/task/PipeTaskCoordinator.java |   6 +
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  14 +
 .../procedure/env/RegionMaintainHandler.java       |  89 +++++++
 .../pipe/PipeTaskInfoConsensusPipeTest.java        | 152 +++++++++++
 .../RegionMaintainHandlerConsensusPipeTest.java    | 290 +++++++++++++++++++++
 .../consensus/config/PipeConsensusConfig.java      |  49 +---
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |  47 ----
 .../consensus/pipe/PipeConsensusServerImpl.java    |  43 ---
 .../pipe/consensuspipe/ConsensusPipeGuardian.java  |  26 --
 .../pipe/consensuspipe/ConsensusPipeSelector.java  |  28 --
 .../db/consensus/DataRegionConsensusImpl.java      |   6 -
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  21 --
 .../ConsensusPipeDataNodeRuntimeAgentGuardian.java |  48 ----
 14 files changed, 573 insertions(+), 266 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index 1df5ac021ca..959dc7f4363 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -55,6 +55,9 @@ public class PipeMetaSyncer {
   private Future<?> metaSyncFuture;
 
   private final AtomicInteger pipeAutoRestartRoundCounter = new 
AtomicInteger(0);
+  private final AtomicInteger consensusPipeCheckRoundCounter = new 
AtomicInteger(0);
+
+  private static final int CONSENSUS_PIPE_CHECK_INTERVAL_ROUND = 5;
 
   private final boolean pipeAutoRestartEnabled =
       PipeConfig.getInstance().getPipeAutoRestartEnabled();
@@ -110,6 +113,11 @@ public class PipeMetaSyncer {
       pipeAutoRestartRoundCounter.set(0);
     }
 
+    if (consensusPipeCheckRoundCounter.incrementAndGet() >= 
CONSENSUS_PIPE_CHECK_INTERVAL_ROUND) {
+      consensusPipeCheckRoundCounter.set(0);
+      checkAndRepairConsensusPipes();
+    }
+
     final TSStatus metaSyncStatus = procedureManager.pipeMetaSync();
 
     if (metaSyncStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -158,6 +166,18 @@ public class PipeMetaSyncer {
     }
   }
 
+  private void checkAndRepairConsensusPipes() {
+    try {
+      configManager
+          .getProcedureManager()
+          .getEnv()
+          .getRegionMaintainHandler()
+          .checkAndRepairConsensusPipes();
+    } catch (Exception e) {
+      LOGGER.warn("Failed to check and repair consensus pipes", e);
+    }
+  }
+
   private boolean handleSuccessfulRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
         configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index ea9c61cf45e..e44e80075cd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import 
org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTaskCoordinator {
@@ -245,6 +247,10 @@ public class PipeTaskCoordinator {
     return !pipeTaskInfo.isEmpty();
   }
 
+  public Map<String, PipeStatus> getConsensusPipeStatusMap() {
+    return pipeTaskInfo.getConsensusPipeStatusMap();
+  }
+
   /** Caller should ensure that the method is called in the write lock of 
{@link #pipeTaskInfo}. */
   public void updateLastSyncedVersion() {
     pipeTaskInfo.updateLastSyncedVersion();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 929024a0689..78cf9a10eee 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -563,6 +563,20 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public Map<String, PipeStatus> getConsensusPipeStatusMap() {
+    acquireReadLock();
+    try {
+      return 
StreamSupport.stream(pipeMetaKeeper.getPipeMetaList().spliterator(), false)
+          .filter(pipeMeta -> 
PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType()))
+          .collect(
+              Collectors.toMap(
+                  pipeMeta -> pipeMeta.getStaticMeta().getPipeName(),
+                  pipeMeta -> pipeMeta.getRuntimeMeta().getStatus().get()));
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   public boolean isEmpty() {
     acquireReadLock();
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index f827adea5d4..0d6f306694b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
@@ -509,6 +510,94 @@ public class RegionMaintainHandler {
     }
   }
 
+  /**
+   * Periodically called by PipeMetaSyncer to reconcile expected consensus 
pipes (derived from
+   * PartitionManager replica sets) with actually existing consensus pipes 
(from PipeTaskInfo).
+   * Creates missing pipes, drops unexpected pipes, and restarts stopped pipes.
+   */
+  public void checkAndRepairConsensusPipes() {
+    if (!IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass())) {
+      return;
+    }
+
+    Map<TConsensusGroupId, TRegionReplicaSet> allDataRegionReplicaSets =
+        
configManager.getPartitionManager().getAllReplicaSetsMap(TConsensusGroupType.DataRegion);
+
+    // Build expected pipe name -> TRegionReplicaSet for creation purposes
+    Map<String, TRegionReplicaSet> expectedPipeToReplicaSet = new HashMap<>();
+    for (TRegionReplicaSet replicaSet : allDataRegionReplicaSets.values()) {
+      List<TDataNodeLocation> locations = replicaSet.getDataNodeLocations();
+      DataRegionId regionId = new 
DataRegionId(replicaSet.getRegionId().getId());
+      for (int i = 0; i < locations.size(); i++) {
+        for (int j = 0; j < locations.size(); j++) {
+          if (i != j) {
+            String pipeName =
+                new ConsensusPipeName(
+                        regionId,
+                        locations.get(i).getDataNodeId(),
+                        locations.get(j).getDataNodeId())
+                    .toString();
+            expectedPipeToReplicaSet.put(pipeName, replicaSet);
+          }
+        }
+      }
+    }
+
+    Map<String, PipeStatus> actualPipes =
+        
configManager.getPipeManager().getPipeTaskCoordinator().getConsensusPipeStatusMap();
+
+    // Create missing pipes
+    for (Map.Entry<String, TRegionReplicaSet> entry : 
expectedPipeToReplicaSet.entrySet()) {
+      String pipeName = entry.getKey();
+      if (!actualPipes.containsKey(pipeName)) {
+        LOGGER.warn(
+            "[ConsensusPipeGuardian] consensus pipe [{}] missing, creating 
asynchronously",
+            pipeName);
+        TRegionReplicaSet replicaSet = entry.getValue();
+        ConsensusPipeName parsed = new ConsensusPipeName(pipeName);
+        TDataNodeLocation senderLocation =
+            findLocationByNodeId(replicaSet.getDataNodeLocations(), 
parsed.getSenderDataNodeId());
+        TDataNodeLocation receiverLocation =
+            findLocationByNodeId(replicaSet.getDataNodeLocations(), 
parsed.getReceiverDataNodeId());
+        if (senderLocation != null && receiverLocation != null) {
+          createSingleConsensusPipeAsync(
+              replicaSet.getRegionId(),
+              senderLocation.getDataNodeId(),
+              senderLocation.getDataRegionConsensusEndPoint(),
+              receiverLocation.getDataNodeId(),
+              receiverLocation.getDataRegionConsensusEndPoint());
+        }
+      }
+    }
+
+    // Drop unexpected pipes and restart stopped pipes
+    for (Map.Entry<String, PipeStatus> entry : actualPipes.entrySet()) {
+      String pipeName = entry.getKey();
+      PipeStatus status = entry.getValue();
+      if (!expectedPipeToReplicaSet.containsKey(pipeName)) {
+        LOGGER.warn(
+            "[ConsensusPipeGuardian] unexpected consensus pipe [{}] exists, 
dropping asynchronously",
+            pipeName);
+        configManager.getProcedureManager().dropConsensusPipeAsync(pipeName);
+      } else if (PipeStatus.STOPPED.equals(status)) {
+        LOGGER.warn(
+            "[ConsensusPipeGuardian] consensus pipe [{}] is stopped, 
restarting asynchronously",
+            pipeName);
+        configManager.getProcedureManager().startConsensusPipe(pipeName);
+      }
+    }
+  }
+
+  private static TDataNodeLocation findLocationByNodeId(
+      List<TDataNodeLocation> locations, int nodeId) {
+    for (TDataNodeLocation location : locations) {
+      if (location.getDataNodeId() == nodeId) {
+        return location;
+      }
+    }
+    return null;
+  }
+
   private boolean isIoTConsensusV2DataRegion(TConsensusGroupId regionId) {
     return TConsensusGroupType.DataRegion.equals(regionId.getType())
         && IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass());
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
new file mode 100644
index 00000000000..094dcebe82f
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.pipe;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeTaskInfoConsensusPipeTest {
+
+  private PipeTaskInfo pipeTaskInfo;
+
+  @Before
+  public void setUp() {
+    pipeTaskInfo = new PipeTaskInfo();
+  }
+
+  private void createPipe(String pipeName, PipeStatus initialStatus) {
+    Map<String, String> extractorAttributes = new HashMap<>();
+    Map<String, String> processorAttributes = new HashMap<>();
+    Map<String, String> connectorAttributes = new HashMap<>();
+    extractorAttributes.put("extractor", "iotdb-source");
+    processorAttributes.put("processor", "do-nothing-processor");
+    connectorAttributes.put("connector", "iotdb-thrift-sink");
+
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+    ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
+    pipeTasks.put(1, pipeTaskMeta);
+    PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            pipeName,
+            System.currentTimeMillis(),
+            extractorAttributes,
+            processorAttributes,
+            connectorAttributes);
+    PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+    CreatePipePlanV2 createPlan = new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta);
+    pipeTaskInfo.createPipe(createPlan);
+
+    if (PipeStatus.RUNNING.equals(initialStatus)) {
+      pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, 
PipeStatus.RUNNING));
+    }
+  }
+
+  @Test
+  public void testGetConsensusPipeStatusMapEmpty() {
+    Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.isEmpty());
+  }
+
+  @Test
+  public void testGetConsensusPipeStatusMapFiltersOnlyConsensusPipes() {
+    String userPipeName = "myUserPipe";
+    createPipe(userPipeName, PipeStatus.RUNNING);
+
+    DataRegionId regionId = new DataRegionId(100);
+    String consensusPipeName1 = new ConsensusPipeName(regionId, 1, 
2).toString();
+    String consensusPipeName2 = new ConsensusPipeName(regionId, 2, 
1).toString();
+    createPipe(consensusPipeName1, PipeStatus.RUNNING);
+    createPipe(consensusPipeName2, PipeStatus.STOPPED);
+
+    Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+    Assert.assertEquals(2, result.size());
+    Assert.assertFalse(result.containsKey(userPipeName));
+    Assert.assertEquals(PipeStatus.RUNNING, result.get(consensusPipeName1));
+    Assert.assertEquals(PipeStatus.STOPPED, result.get(consensusPipeName2));
+  }
+
+  @Test
+  public void testGetConsensusPipeStatusMapWithOnlyUserPipes() {
+    createPipe("userPipe1", PipeStatus.RUNNING);
+    createPipe("userPipe2", PipeStatus.STOPPED);
+
+    Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+    Assert.assertNotNull(result);
+    Assert.assertTrue(result.isEmpty());
+  }
+
+  @Test
+  public void testGetConsensusPipeStatusMapWithMultipleRegions() {
+    DataRegionId region1 = new DataRegionId(100);
+    DataRegionId region2 = new DataRegionId(200);
+
+    String pipe1 = new ConsensusPipeName(region1, 1, 2).toString();
+    String pipe2 = new ConsensusPipeName(region1, 2, 1).toString();
+    String pipe3 = new ConsensusPipeName(region2, 3, 4).toString();
+    String pipe4 = new ConsensusPipeName(region2, 4, 3).toString();
+    createPipe(pipe1, PipeStatus.RUNNING);
+    createPipe(pipe2, PipeStatus.RUNNING);
+    createPipe(pipe3, PipeStatus.STOPPED);
+    createPipe(pipe4, PipeStatus.RUNNING);
+
+    Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+    Assert.assertEquals(4, result.size());
+    Assert.assertEquals(PipeStatus.RUNNING, result.get(pipe1));
+    Assert.assertEquals(PipeStatus.RUNNING, result.get(pipe2));
+    Assert.assertEquals(PipeStatus.STOPPED, result.get(pipe3));
+    Assert.assertEquals(PipeStatus.RUNNING, result.get(pipe4));
+  }
+
+  @Test
+  public void testGetConsensusPipeStatusMapExcludesSubscriptionPipes() {
+    String subscriptionPipeName = PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX + 
"topic1.group1";
+    createPipe(subscriptionPipeName, PipeStatus.RUNNING);
+
+    DataRegionId regionId = new DataRegionId(100);
+    String consensusPipeName = new ConsensusPipeName(regionId, 1, 
2).toString();
+    createPipe(consensusPipeName, PipeStatus.RUNNING);
+
+    Map<String, PipeStatus> result = pipeTaskInfo.getConsensusPipeStatusMap();
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey(consensusPipeName));
+    Assert.assertFalse(result.containsKey(subscriptionPipeName));
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
new file mode 100644
index 00000000000..b384b9e0f0a
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import 
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RegionMaintainHandlerConsensusPipeTest {
+
+  private ConfigManager configManager;
+  private PartitionManager partitionManager;
+  private PipeManager pipeManager;
+  private PipeTaskCoordinator pipeTaskCoordinator;
+  private ProcedureManager procedureManager;
+  private RegionMaintainHandler handler;
+
+  private String originalConsensusProtocol;
+
+  @Before
+  public void setUp() {
+    originalConsensusProtocol =
+        
ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass();
+    ConfigNodeDescriptor.getInstance()
+        .getConf()
+        .setDataRegionConsensusProtocolClass(IOT_CONSENSUS_V2);
+
+    configManager = mock(ConfigManager.class);
+    partitionManager = mock(PartitionManager.class);
+    pipeManager = mock(PipeManager.class);
+    pipeTaskCoordinator = mock(PipeTaskCoordinator.class);
+    procedureManager = mock(ProcedureManager.class);
+
+    when(configManager.getPartitionManager()).thenReturn(partitionManager);
+    when(configManager.getPipeManager()).thenReturn(pipeManager);
+    when(configManager.getProcedureManager()).thenReturn(procedureManager);
+    when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
+
+    handler = new RegionMaintainHandler(configManager);
+  }
+
+  @After
+  public void tearDown() {
+    ConfigNodeDescriptor.getInstance()
+        .getConf()
+        .setDataRegionConsensusProtocolClass(originalConsensusProtocol);
+  }
+
+  private TDataNodeLocation makeLocation(int nodeId, String ip, int 
consensusPort) {
+    TDataNodeLocation location = new TDataNodeLocation();
+    location.setDataNodeId(nodeId);
+    location.setClientRpcEndPoint(new TEndPoint(ip, 6667));
+    location.setInternalEndPoint(new TEndPoint(ip, 10730));
+    location.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740));
+    location.setDataRegionConsensusEndPoint(new TEndPoint(ip, consensusPort));
+    location.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10760));
+    return location;
+  }
+
+  private TRegionReplicaSet makeReplicaSet(int regionId, TDataNodeLocation... 
locations) {
+    TRegionReplicaSet replicaSet = new TRegionReplicaSet();
+    replicaSet.setRegionId(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId));
+    replicaSet.setDataNodeLocations(Arrays.asList(locations));
+    return replicaSet;
+  }
+
+  @Test
+  public void testNoOpWhenNotIoTConsensusV2() {
+    ConfigNodeDescriptor.getInstance()
+        .getConf()
+        
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+
+    handler.checkAndRepairConsensusPipes();
+
+    verify(partitionManager, never()).getAllReplicaSetsMap(any());
+  }
+
+  @Test
+  public void testNothingToDoWhenAllPipesMatch() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+    TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2);
+
+    TConsensusGroupId groupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+    replicaSets.put(groupId, replicaSet);
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(replicaSets);
+
+    DataRegionId regionId = new DataRegionId(100);
+    String pipe1to2 = new ConsensusPipeName(regionId, 1, 2).toString();
+    String pipe2to1 = new ConsensusPipeName(regionId, 2, 1).toString();
+    Map<String, PipeStatus> actualPipes = new HashMap<>();
+    actualPipes.put(pipe1to2, PipeStatus.RUNNING);
+    actualPipes.put(pipe2to1, PipeStatus.RUNNING);
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+    handler.checkAndRepairConsensusPipes();
+
+    verify(procedureManager, never()).createConsensusPipeAsync(any());
+    verify(procedureManager, never()).dropConsensusPipeAsync(any());
+    verify(procedureManager, never()).startConsensusPipe(any());
+  }
+
+  @Test
+  public void testCreatesMissingPipes() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+    TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2);
+
+    TConsensusGroupId groupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+    replicaSets.put(groupId, replicaSet);
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(replicaSets);
+
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(Collections.emptyMap());
+
+    handler.checkAndRepairConsensusPipes();
+
+    verify(procedureManager, 
times(2)).createConsensusPipeAsync(any(TCreatePipeReq.class));
+    verify(procedureManager, never()).dropConsensusPipeAsync(any());
+    verify(procedureManager, never()).startConsensusPipe(any());
+  }
+
+  @Test
+  public void testDropsUnexpectedPipes() {
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(Collections.emptyMap());
+
+    DataRegionId regionId = new DataRegionId(999);
+    String unexpectedPipe = new ConsensusPipeName(regionId, 1, 2).toString();
+    Map<String, PipeStatus> actualPipes = new HashMap<>();
+    actualPipes.put(unexpectedPipe, PipeStatus.RUNNING);
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+    handler.checkAndRepairConsensusPipes();
+
+    verify(procedureManager, never()).createConsensusPipeAsync(any());
+    verify(procedureManager, times(1)).dropConsensusPipeAsync(unexpectedPipe);
+    verify(procedureManager, never()).startConsensusPipe(any());
+  }
+
+  @Test
+  public void testRestartsStoppedPipes() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+    TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2);
+
+    TConsensusGroupId groupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+    replicaSets.put(groupId, replicaSet);
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(replicaSets);
+
+    DataRegionId regionId = new DataRegionId(100);
+    String pipe1to2 = new ConsensusPipeName(regionId, 1, 2).toString();
+    String pipe2to1 = new ConsensusPipeName(regionId, 2, 1).toString();
+    Map<String, PipeStatus> actualPipes = new HashMap<>();
+    actualPipes.put(pipe1to2, PipeStatus.RUNNING);
+    actualPipes.put(pipe2to1, PipeStatus.STOPPED);
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+    handler.checkAndRepairConsensusPipes();
+
+    verify(procedureManager, never()).createConsensusPipeAsync(any());
+    verify(procedureManager, never()).dropConsensusPipeAsync(any());
+    verify(procedureManager, times(1)).startConsensusPipe(pipe2to1);
+  }
+
+  @Test
+  public void testMixedScenario() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+    TDataNodeLocation loc3 = makeLocation(3, "127.0.0.3", 40010);
+    TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2, loc3);
+
+    TConsensusGroupId groupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+    replicaSets.put(groupId, replicaSet);
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(replicaSets);
+
+    DataRegionId regionId100 = new DataRegionId(100);
+    DataRegionId regionId999 = new DataRegionId(999);
+
+    // 3 nodes => 6 expected pipes: 1->2, 1->3, 2->1, 2->3, 3->1, 3->2
+    // Provide only 3 of the 6, one stopped; plus one unexpected pipe
+    String pipe1to2 = new ConsensusPipeName(regionId100, 1, 2).toString();
+    String pipe2to1 = new ConsensusPipeName(regionId100, 2, 1).toString();
+    String pipe3to1 = new ConsensusPipeName(regionId100, 3, 1).toString();
+    String unexpected = new ConsensusPipeName(regionId999, 5, 6).toString();
+
+    Map<String, PipeStatus> actualPipes = new HashMap<>();
+    actualPipes.put(pipe1to2, PipeStatus.RUNNING);
+    actualPipes.put(pipe2to1, PipeStatus.STOPPED);
+    actualPipes.put(pipe3to1, PipeStatus.RUNNING);
+    actualPipes.put(unexpected, PipeStatus.RUNNING);
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(actualPipes);
+
+    handler.checkAndRepairConsensusPipes();
+
+    // Missing: 1->3, 2->3, 3->2 => 3 creates
+    verify(procedureManager, 
times(3)).createConsensusPipeAsync(any(TCreatePipeReq.class));
+    // Unexpected pipe => 1 drop
+    verify(procedureManager, times(1)).dropConsensusPipeAsync(unexpected);
+    // Stopped pipe 2->1 => 1 restart
+    verify(procedureManager, times(1)).startConsensusPipe(pipe2to1);
+  }
+
+  @Test
+  public void testThreeNodeReplicaSetCreatesAllSixPipes() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    TDataNodeLocation loc2 = makeLocation(2, "127.0.0.2", 40010);
+    TDataNodeLocation loc3 = makeLocation(3, "127.0.0.3", 40010);
+    TRegionReplicaSet replicaSet = makeReplicaSet(100, loc1, loc2, loc3);
+
+    TConsensusGroupId groupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Map<TConsensusGroupId, TRegionReplicaSet> replicaSets = new HashMap<>();
+    replicaSets.put(groupId, replicaSet);
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(replicaSets);
+
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(Collections.emptyMap());
+
+    handler.checkAndRepairConsensusPipes();
+
+    // 3 nodes => 3*2 = 6 pipes to create
+    verify(procedureManager, 
times(6)).createConsensusPipeAsync(any(TCreatePipeReq.class));
+  }
+
+  @Test
+  public void testEmptyReplicaSetsAndEmptyPipes() {
+    when(partitionManager.getAllReplicaSetsMap(TConsensusGroupType.DataRegion))
+        .thenReturn(Collections.emptyMap());
+    
when(pipeTaskCoordinator.getConsensusPipeStatusMap()).thenReturn(Collections.emptyMap());
+
+    handler.checkAndRepairConsensusPipes();
+
+    verify(procedureManager, never()).createConsensusPipeAsync(any());
+    verify(procedureManager, never()).dropConsensusPipeAsync(any());
+    verify(procedureManager, never()).startConsensusPipe(any());
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index c0d7257183d..28625f1e9cd 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -20,9 +20,7 @@
 package org.apache.iotdb.consensus.config;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeReceiver;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ReplicateProgressManager;
 
 import java.util.concurrent.TimeUnit;
@@ -243,29 +241,20 @@ public class PipeConsensusConfig {
     private final String extractorPluginName;
     private final String processorPluginName;
     private final String connectorPluginName;
-    private final ConsensusPipeGuardian consensusPipeGuardian;
-    private final ConsensusPipeSelector consensusPipeSelector;
     private final ReplicateProgressManager replicateProgressManager;
     private final ConsensusPipeReceiver consensusPipeReceiver;
-    private final long consensusPipeGuardJobIntervalInSeconds;
 
     public Pipe(
         String extractorPluginName,
         String processorPluginName,
         String connectorPluginName,
-        ConsensusPipeGuardian consensusPipeGuardian,
-        ConsensusPipeSelector consensusPipeSelector,
         ReplicateProgressManager replicateProgressManager,
-        ConsensusPipeReceiver consensusPipeReceiver,
-        long consensusPipeGuardJobIntervalInSeconds) {
+        ConsensusPipeReceiver consensusPipeReceiver) {
       this.extractorPluginName = extractorPluginName;
       this.processorPluginName = processorPluginName;
       this.connectorPluginName = connectorPluginName;
-      this.consensusPipeGuardian = consensusPipeGuardian;
-      this.consensusPipeSelector = consensusPipeSelector;
       this.replicateProgressManager = replicateProgressManager;
       this.consensusPipeReceiver = consensusPipeReceiver;
-      this.consensusPipeGuardJobIntervalInSeconds = 
consensusPipeGuardJobIntervalInSeconds;
     }
 
     public String getExtractorPluginName() {
@@ -280,14 +269,6 @@ public class PipeConsensusConfig {
       return connectorPluginName;
     }
 
-    public ConsensusPipeGuardian getConsensusPipeGuardian() {
-      return consensusPipeGuardian;
-    }
-
-    public ConsensusPipeSelector getConsensusPipeSelector() {
-      return consensusPipeSelector;
-    }
-
     public ConsensusPipeReceiver getConsensusPipeReceiver() {
       return consensusPipeReceiver;
     }
@@ -296,10 +277,6 @@ public class PipeConsensusConfig {
       return replicateProgressManager;
     }
 
-    public long getConsensusPipeGuardJobIntervalInSeconds() {
-      return consensusPipeGuardJobIntervalInSeconds;
-    }
-
     public static Pipe.Builder newBuilder() {
       return new Pipe.Builder();
     }
@@ -310,11 +287,8 @@ public class PipeConsensusConfig {
           BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName();
       private String connectorPluginName =
           BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName();
-      private ConsensusPipeGuardian consensusPipeGuardian = null;
-      private ConsensusPipeSelector consensusPipeSelector = null;
       private ReplicateProgressManager replicateProgressManager = null;
       private ConsensusPipeReceiver consensusPipeReceiver = null;
-      private long consensusPipeGuardJobIntervalInSeconds = 180L;
 
       public Pipe.Builder setExtractorPluginName(String extractorPluginName) {
         this.extractorPluginName = extractorPluginName;
@@ -331,16 +305,6 @@ public class PipeConsensusConfig {
         return this;
       }
 
-      public Pipe.Builder setConsensusPipeGuardian(ConsensusPipeGuardian 
consensusPipeGuardian) {
-        this.consensusPipeGuardian = consensusPipeGuardian;
-        return this;
-      }
-
-      public Pipe.Builder setConsensusPipeSelector(ConsensusPipeSelector 
consensusPipeSelector) {
-        this.consensusPipeSelector = consensusPipeSelector;
-        return this;
-      }
-
       public Pipe.Builder setConsensusPipeReceiver(ConsensusPipeReceiver 
consensusPipeReceiver) {
         this.consensusPipeReceiver = consensusPipeReceiver;
         return this;
@@ -352,22 +316,13 @@ public class PipeConsensusConfig {
         return this;
       }
 
-      public Pipe.Builder setConsensusPipeGuardJobIntervalInSeconds(
-          long consensusPipeGuardJobIntervalInSeconds) {
-        this.consensusPipeGuardJobIntervalInSeconds = 
consensusPipeGuardJobIntervalInSeconds;
-        return this;
-      }
-
       public Pipe build() {
         return new Pipe(
             extractorPluginName,
             processorPluginName,
             connectorPluginName,
-            consensusPipeGuardian,
-            consensusPipeSelector,
             replicateProgressManager,
-            consensusPipeReceiver,
-            consensusPipeGuardJobIntervalInSeconds);
+            consensusPipeReceiver);
       }
     }
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 33d73d673bf..2981a1a9204 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import 
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
 import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
@@ -50,16 +49,12 @@ import 
org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeSelector;
 import org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCService;
 import 
org.apache.iotdb.consensus.pipe.service.PipeConsensusRPCServiceProcessor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,10 +77,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 
 public class PipeConsensus implements IConsensus {
-  private static final String CONSENSUS_PIPE_GUARDIAN_TASK_ID = 
"consensus_pipe_guardian";
   private static final String CLASS_NAME = PipeConsensus.class.getSimpleName();
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensus.class);
 
@@ -101,8 +94,6 @@ public class PipeConsensus implements IConsensus {
       new ConcurrentHashMap<>();
   private final ReentrantReadWriteLock stateMachineMapLock = new 
ReentrantReadWriteLock();
   private final PipeConsensusConfig config;
-  private final ConsensusPipeSelector consensusPipeSelector;
-  private final ConsensusPipeGuardian consensusPipeGuardian;
   private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
   private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;
@@ -114,10 +105,6 @@ public class PipeConsensus implements IConsensus {
     this.config = config.getPipeConsensusConfig();
     this.registry = registry;
     this.rpcService = new PipeConsensusRPCService(thisNode, 
config.getPipeConsensusConfig());
-    this.consensusPipeSelector =
-        config.getPipeConsensusConfig().getPipe().getConsensusPipeSelector();
-    this.consensusPipeGuardian =
-        config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
     this.asyncClientManager =
         
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
     this.syncClientManager =
@@ -145,11 +132,6 @@ public class PipeConsensus implements IConsensus {
       Thread.currentThread().interrupt();
       LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
     }
-    // only when we recover all consensus group can we launch async backend 
checker thread
-    consensusPipeGuardian.start(
-        CONSENSUS_PIPE_GUARDIAN_TASK_ID,
-        this::checkAllConsensusPipe,
-        config.getPipe().getConsensusPipeGuardJobIntervalInSeconds());
   }
 
   private Future<Void> initAndRecover() throws IOException {
@@ -233,39 +215,10 @@ public class PipeConsensus implements IConsensus {
     asyncClientManager.close();
     syncClientManager.close();
     registerManager.deregisterAll();
-    consensusPipeGuardian.stop();
     
stateMachineMap.values().parallelStream().forEach(PipeConsensusServerImpl::stop);
     IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
   }
 
-  private void checkAllConsensusPipe() {
-    final Map<ConsensusGroupId, Map<ConsensusPipeName, PipeStatus>> 
existedPipes =
-        consensusPipeSelector.getAllConsensusPipe().entrySet().stream()
-            .filter(entry -> entry.getKey().getSenderDataNodeId() == 
thisNodeId)
-            .collect(
-                Collectors.groupingBy(
-                    entry -> entry.getKey().getConsensusGroupId(),
-                    Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
-    stateMachineMapLock.writeLock().lock();
-    try {
-      stateMachineMap.forEach(
-          (key, value) ->
-              value.checkConsensusPipe(existedPipes.getOrDefault(key, 
ImmutableMap.of())));
-      // Log orphaned pipes (region no longer exists locally); ConfigNode 
handles actual cleanup.
-      existedPipes.entrySet().stream()
-          .filter(entry -> !stateMachineMap.containsKey(entry.getKey()))
-          .flatMap(entry -> entry.getValue().keySet().stream())
-          .forEach(
-              consensusPipeName ->
-                  LOGGER.warn(
-                      "{} orphaned consensus pipe [{}] found, should be 
dropped by ConfigNode",
-                      consensusPipeName.getConsensusGroupId(),
-                      consensusPipeName));
-    } finally {
-      stateMachineMapLock.writeLock().unlock();
-    }
-  }
-
   @Override
   public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
       throws ConsensusException {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 7aed02e075f..2ef293a960a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
@@ -56,7 +55,6 @@ import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResour
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +62,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -135,46 +132,6 @@ public class PipeConsensusServerImpl {
     active.set(false);
   }
 
-  /**
-   * Detect inconsistencies between expected and existed consensus pipes. 
Actual remediation
-   * (create/drop/update) is handled by ConfigNode; this method only logs 
warnings.
-   */
-  public synchronized void checkConsensusPipe(Map<ConsensusPipeName, 
PipeStatus> existedPipes) {
-    final PipeStatus expectedStatus = isStarted.get() ? PipeStatus.RUNNING : 
PipeStatus.STOPPED;
-    final Map<ConsensusPipeName, Peer> expectedPipes =
-        peerManager.getOtherPeers(thisNode).stream()
-            .collect(
-                ImmutableMap.toImmutableMap(
-                    peer -> new ConsensusPipeName(thisNode, peer), peer -> 
peer));
-
-    existedPipes.forEach(
-        (existedName, existedStatus) -> {
-          if (!expectedPipes.containsKey(existedName)) {
-            LOGGER.warn(
-                "{} unexpected consensus pipe [{}] exists, should be dropped 
by ConfigNode",
-                consensusGroupId,
-                existedName);
-          } else if (!expectedStatus.equals(existedStatus)) {
-            LOGGER.warn(
-                "{} consensus pipe [{}] status mismatch: expected={}, 
actual={}",
-                consensusGroupId,
-                existedName,
-                expectedStatus,
-                existedStatus);
-          }
-        });
-
-    expectedPipes.forEach(
-        (expectedName, expectedPeer) -> {
-          if (!existedPipes.containsKey(expectedName)) {
-            LOGGER.warn(
-                "{} consensus pipe [{}] missing, should be created by 
ConfigNode",
-                consensusGroupId,
-                expectedName);
-          }
-        });
-  }
-
   public TSStatus write(IConsensusRequest request) {
     stateMachineLock.lock();
     try {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeGuardian.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeGuardian.java
deleted file mode 100644
index 6c1e9cc1eb1..00000000000
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeGuardian.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-public interface ConsensusPipeGuardian {
-  void start(String id, Runnable guardJob, long intervalInSeconds);
-
-  void stop();
-}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeSelector.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeSelector.java
deleted file mode 100644
index 6af75b5718e..00000000000
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeSelector.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.pipe.consensuspipe;
-
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
-
-import java.util.Map;
-
-public interface ConsensusPipeSelector {
-  Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe();
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index a33cdd11024..0a908ff8967 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
 import 
org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import 
org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
 import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -186,13 +185,8 @@ public class DataRegionConsensusImpl {
                               
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName())
                           .setConnectorPluginName(
                               
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName())
-                          // name
-                          .setConsensusPipeGuardian(new 
ConsensusPipeDataNodeRuntimeAgentGuardian())
-                          .setConsensusPipeSelector(
-                              () -> 
PipeDataNodeAgent.task().getAllConsensusPipe())
                           
.setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus())
                           .setProgressIndexManager(new 
ReplicateProgressDataNodeManager())
-                          .setConsensusPipeGuardJobIntervalInSeconds(300)
                           .build())
                   
.setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode()))
                   .build())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 2f1be77fe3e..0f8b9446d60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -46,7 +45,6 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -75,7 +73,6 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -102,7 +99,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
@@ -673,23 +669,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
-  public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
-    if (!tryReadLockWithTimeOut(10)) {
-      throw new PipeException("Failed to get all consensus pipe.");
-    }
-
-    try {
-      return 
StreamSupport.stream(pipeMetaKeeper.getPipeMetaList().spliterator(), false)
-          .filter(pipeMeta -> 
PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType()))
-          .collect(
-              ImmutableMap.toImmutableMap(
-                  pipeMeta -> new 
ConsensusPipeName(pipeMeta.getStaticMeta().getPipeName()),
-                  pipeMeta -> pipeMeta.getRuntimeMeta().getStatus().get()));
-    } finally {
-      releaseReadLock();
-    }
-  }
-
   @Override
   protected void calculateMemoryUsage(
       final PipeStaticMeta staticMeta,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java
deleted file mode 100644
index 2f3cf532fc3..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.consensus;
-
-import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsensusPipeDataNodeRuntimeAgentGuardian implements 
ConsensusPipeGuardian {
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(ConsensusPipeDataNodeRuntimeAgentGuardian.class);
-  private boolean registered = false;
-
-  @Override
-  public synchronized void start(String id, Runnable guardJob, long 
intervalInSeconds) {
-    if (!registered) {
-      LOGGER.info(
-          "Registering periodical job {} with interval in seconds {}.", id, 
intervalInSeconds);
-
-      this.registered = true;
-      PipeDataNodeAgent.runtime().registerPeriodicalJob(id, guardJob, 
intervalInSeconds);
-    }
-  }
-
-  @Override
-  public synchronized void stop() {
-    // Do nothing because PipePeriodicalJobExecutor currently has no 
deregister logic
-  }
-}

Reply via email to