This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch potential-leak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3167d75f51b61eaf707602253572903c5ff07b92
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 13 18:54:58 2026 +0800

    Fix
---
 .../schemaregion/SchemaExecutionVisitor.java       | 20 +++--
 .../schemaregion/SchemaRegionStateMachine.java     | 46 ++++++++----
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    | 12 +++
 .../runtime/PipeSchemaRegionListenerManager.java   | 85 ++++++++++++----------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 15 +++-
 .../processor/aggregate/AggregateProcessor.java    | 19 +++--
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  1 +
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  1 +
 .../thrift/impl/DataNodeRegionManager.java         |  8 +-
 .../PipeSchemaRegionListenerManagerTest.java       | 72 ++++++++++++++++++
 .../task/progress/PipeEventCommitManager.java      | 21 ++++++
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  2 +-
 12 files changed, 229 insertions(+), 73 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index f3ddfbab7b2..2fc2694c4c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -843,13 +843,19 @@ public class SchemaExecutionVisitor implements 
PlanVisitor<TSStatus, ISchemaRegi
   public TSStatus visitPipeOperateSchemaQueueNode(
       final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) 
{
     final SchemaRegionId id = schemaRegion.getSchemaRegionId();
-    final SchemaRegionListeningQueue queue = 
PipeDataNodeAgent.runtime().schemaListener(id);
-    if (node.isOpen() && !queue.isOpened()) {
-      logger.info("Opened pipe listening queue on schema region {}", id);
-      queue.open();
-    } else if (!node.isOpen() && queue.isOpened()) {
-      logger.info("Closed pipe listening queue on schema region {}", id);
-      queue.close();
+    if (node.isOpen()) {
+      final SchemaRegionListeningQueue queue = 
PipeDataNodeAgent.runtime().schemaListener(id);
+      if (!queue.isOpened()) {
+        logger.info("Opened pipe listening queue on schema region {}", id);
+        queue.open();
+      }
+    } else {
+      final SchemaRegionListeningQueue queue = 
PipeDataNodeAgent.runtime().schemaListenerIfPresent(id);
+      if (queue != null && queue.isOpened()) {
+        logger.info("Closed pipe listening queue on schema region {}", id);
+        queue.close();
+        PipeDataNodeAgent.runtime().cleanupSchemaListenerIfUnused(id);
+      }
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index edab5862d11..d904da7df31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -125,25 +125,34 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
 
   @Override
   public boolean takeSnapshot(final File snapshotDir) {
-    if (schemaRegion.createSnapshot(snapshotDir)
-        && PipeDataNodeAgent.runtime()
-            .schemaListener(schemaRegion.getSchemaRegionId())
-            .createSnapshot(snapshotDir)) {
-      listen2Snapshot4PipeListener(true);
+    if (!schemaRegion.createSnapshot(snapshotDir)) {
+      return false;
+    }
+
+    final SchemaRegionListeningQueue listener =
+        
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
+    if (listener == null) {
       return true;
     }
-    return false;
+
+    if (!listener.createSnapshot(snapshotDir)) {
+      return false;
+    }
+    listen2Snapshot4PipeListener(true);
+    return true;
   }
 
   @Override
   public void loadSnapshot(final File latestSnapshotRootDir) {
     schemaRegion.loadSnapshot(latestSnapshotRootDir);
-    PipeDataNodeAgent.runtime()
-        .schemaListener(schemaRegion.getSchemaRegionId())
-        .loadSnapshot(latestSnapshotRootDir);
-    // We recompute the snapshot for pipe listener when loading snapshot
-    // to recover the newest snapshot in cache
-    listen2Snapshot4PipeListener(false);
+    final SchemaRegionListeningQueue listener =
+        
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
+    if (listener != null) {
+      listener.loadSnapshot(latestSnapshotRootDir);
+      // We recompute the snapshot for pipe listener when loading snapshot
+      // to recover the newest snapshot in cache
+      listen2Snapshot4PipeListener(false);
+    }
   }
 
   public void listen2Snapshot4PipeListener(final boolean isTmp) {
@@ -154,7 +163,10 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
                 .toString(),
             isTmp);
     final SchemaRegionListeningQueue listener =
-        
PipeDataNodeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId());
+        
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
+    if (listener == null) {
+      return;
+    }
     if (Objects.isNull(snapshotPaths) || Objects.isNull(snapshotPaths.get(0))) 
{
       if (listener.isOpened()) {
         logger.warn(
@@ -181,9 +193,11 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
       final TSStatus result =
           ((PlanNode) request).accept(new SchemaExecutionVisitor(), 
schemaRegion);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        PipeDataNodeAgent.runtime()
-            .schemaListener(schemaRegion.getSchemaRegionId())
-            .tryListenToNode((PlanNode) request);
+        final SchemaRegionListeningQueue listener =
+            
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
+        if (listener != null) {
+          listener.tryListenToNode((PlanNode) request);
+        }
       }
       return result;
     } catch (final IllegalArgumentException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 8f75b7b1885..3d3547f649b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -148,6 +148,10 @@ public class PipeDataNodeRuntimeAgent implements IService {
     return regionListenerManager.listener(schemaRegionId);
   }
 
+  public SchemaRegionListeningQueue schemaListenerIfPresent(SchemaRegionId 
schemaRegionId) {
+    return regionListenerManager.listenerIfPresent(schemaRegionId);
+  }
+
   public int increaseAndGetSchemaListenerReferenceCount(SchemaRegionId 
schemaRegionId) {
     return regionListenerManager.increaseAndGetReferenceCount(schemaRegionId);
   }
@@ -156,6 +160,14 @@ public class PipeDataNodeRuntimeAgent implements IService {
     return regionListenerManager.decreaseAndGetReferenceCount(schemaRegionId);
   }
 
+  public void cleanupSchemaListenerIfUnused(SchemaRegionId schemaRegionId) {
+    regionListenerManager.cleanupListenerIfUnused(schemaRegionId);
+  }
+
+  public void clearSchemaRegionState(SchemaRegionId schemaRegionId) {
+    regionListenerManager.clearSchemaRegionState(schemaRegionId);
+  }
+
   public void notifySchemaLeaderReady(SchemaRegionId schemaRegionId) {
     regionListenerManager.notifyLeaderReady(schemaRegionId);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java
index d28929b36a2..3b03146256b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
 import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionListenerMetrics;
 import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 
@@ -34,6 +33,7 @@ public class PipeSchemaRegionListenerManager {
 
   private final Map<SchemaRegionId, PipeSchemaRegionListener> id2ListenerMap =
       new ConcurrentHashMap<>();
+  private final Map<SchemaRegionId, AtomicBoolean> id2LeaderReadyMap = new 
ConcurrentHashMap<>();
 
   public synchronized Set<SchemaRegionId> regionIds() {
     return id2ListenerMap.keySet();
@@ -44,6 +44,12 @@ public class PipeSchemaRegionListenerManager {
         .listeningQueue;
   }
 
+  public synchronized SchemaRegionListeningQueue listenerIfPresent(
+      final SchemaRegionId schemaRegionId) {
+    final PipeSchemaRegionListener listener = 
id2ListenerMap.get(schemaRegionId);
+    return listener == null ? null : listener.listeningQueue;
+  }
+
   public synchronized int increaseAndGetReferenceCount(final SchemaRegionId 
schemaRegionId) {
     return id2ListenerMap
         .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
@@ -52,28 +58,55 @@ public class PipeSchemaRegionListenerManager {
   }
 
   public synchronized int decreaseAndGetReferenceCount(final SchemaRegionId 
schemaRegionId) {
-    return id2ListenerMap
-        .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
-        .listeningQueueReferenceCount
-        .updateAndGet(v -> v > 0 ? v - 1 : 0);
+    final PipeSchemaRegionListener listener = 
id2ListenerMap.get(schemaRegionId);
+    if (listener == null) {
+      return 0;
+    }
+
+    final int referenceCount =
+        listener.listeningQueueReferenceCount.updateAndGet(v -> v > 0 ? v - 1 
: 0);
+    if (referenceCount == 0 && !listener.listeningQueue.isOpened()) {
+      cleanupListenerIfUnused(schemaRegionId, listener);
+    }
+    return referenceCount;
   }
 
   public synchronized void notifyLeaderReady(final SchemaRegionId 
schemaRegionId) {
-    id2ListenerMap
-        .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
-        .notifyLeaderReady();
+    id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new 
AtomicBoolean()).set(true);
   }
 
   public synchronized void notifyLeaderUnavailable(final SchemaRegionId 
schemaRegionId) {
-    id2ListenerMap
-        .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
-        .notifyLeaderUnavailable();
+    id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new 
AtomicBoolean()).set(false);
   }
 
   public synchronized boolean isLeaderReady(final SchemaRegionId 
schemaRegionId) {
-    return id2ListenerMap
-        .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
-        .isLeaderReady();
+    final AtomicBoolean isLeaderReady = id2LeaderReadyMap.get(schemaRegionId);
+    return isLeaderReady != null && isLeaderReady.get();
+  }
+
+  public synchronized void cleanupListenerIfUnused(final SchemaRegionId 
schemaRegionId) {
+    final PipeSchemaRegionListener listener = 
id2ListenerMap.get(schemaRegionId);
+    if (listener != null) {
+      cleanupListenerIfUnused(schemaRegionId, listener);
+    }
+  }
+
+  public synchronized void clearSchemaRegionState(final SchemaRegionId 
schemaRegionId) {
+    final PipeSchemaRegionListener listener = 
id2ListenerMap.remove(schemaRegionId);
+    if (listener != null) {
+      
PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId());
+    }
+    id2LeaderReadyMap.remove(schemaRegionId);
+  }
+
+  private void cleanupListenerIfUnused(
+      final SchemaRegionId schemaRegionId, final PipeSchemaRegionListener 
listener) {
+    if (listener.listeningQueueReferenceCount.get() > 0 || 
listener.listeningQueue.isOpened()) {
+      return;
+    }
+    if (id2ListenerMap.remove(schemaRegionId, listener)) {
+      
PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId());
+    }
   }
 
   private static class PipeSchemaRegionListener {
@@ -81,33 +114,9 @@ public class PipeSchemaRegionListenerManager {
     private final SchemaRegionListeningQueue listeningQueue = new 
SchemaRegionListeningQueue();
     private final AtomicInteger listeningQueueReferenceCount = new 
AtomicInteger(0);
 
-    private final AtomicBoolean isLeaderReady = new AtomicBoolean(false);
-
     protected PipeSchemaRegionListener(final SchemaRegionId schemaRegionId) {
       PipeSchemaRegionListenerMetrics.getInstance()
           .register(listeningQueue, schemaRegionId.getId());
     }
-
-    /**
-     * Get leader ready state, DO NOT use consensus layer's leader ready flag 
because
-     * SimpleConsensus' ready flag is always {@code true}. Note that this flag 
has nothing to do
-     * with listening and a {@link PipeTask} starts only iff the current node 
is a leader and ready.
-     *
-     * @return {@code true} iff the current node is a leader and ready
-     */
-    private boolean isLeaderReady() {
-      return isLeaderReady.get();
-    }
-
-    // Leader ready flag has the following effect
-    // 1. The linked list starts serving only after leader gets ready
-    // 2. Config pipe task is only created after leader gets ready
-    private void notifyLeaderReady() {
-      isLeaderReady.set(true);
-    }
-
-    private void notifyLeaderUnavailable() {
-      isLeaderReady.set(false);
-    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 06d6a512b30..fd57495b14c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
+import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -59,6 +60,7 @@ import 
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter;
+import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -267,10 +269,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
 
     schemaRegionId2ListeningQueueNewFirstIndex.forEach(
-        (schemaRegionId, listeningQueueNewFirstIndex) ->
-            PipeDataNodeAgent.runtime()
-                .schemaListener(new SchemaRegionId(schemaRegionId))
-                .removeBefore(listeningQueueNewFirstIndex));
+        (schemaRegionId, listeningQueueNewFirstIndex) -> {
+          final SchemaRegionListeningQueue listeningQueue =
+              PipeDataNodeAgent.runtime().schemaListenerIfPresent(new 
SchemaRegionId(schemaRegionId));
+          if (listeningQueue != null) {
+            listeningQueue.removeBefore(listeningQueueNewFirstIndex);
+          }
+        });
 
     return schemaRegionId2ListeningQueueNewFirstIndex.keySet();
   }
@@ -323,6 +328,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final String taskId = pipeName + "_" + creationTime;
     PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
     PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
+    PipeEventCommitManager.getInstance().clear(pipeName, creationTime);
 
     return true;
   }
@@ -351,6 +357,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final String taskId = pipeName + "_" + creationTime;
       PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
       PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
+      PipeEventCommitManager.getInstance().clear(pipeName, creationTime);
       // When the pipe contains no pipe tasks, there is no corresponding 
prefetching queue for the
       // subscribed pipe, so the subscription needs to be manually marked as 
completed.
       if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 63bdbd1daec..24c930a04a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -842,12 +842,19 @@ public class AggregateProcessor implements PipeProcessor {
 
   @Override
   public void close() throws Exception {
-    if (Objects.nonNull(pipeName)
-        && pipeName2referenceCountMap.compute(
-                pipeName, (name, count) -> Objects.nonNull(count) ? count - 1 
: 0)
-            == 0) {
-      pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).clear();
-      pipeName2timeSeries2TimeSeriesRuntimeStateMap.remove(pipeName);
+    boolean isLastReference = false;
+    if (Objects.nonNull(pipeName)) {
+      isLastReference =
+          pipeName2referenceCountMap.computeIfPresent(
+                  pipeName, (name, count) -> count > 1 ? count - 1 : null)
+              == null;
+    }
+    if (isLastReference) {
+      final ConcurrentMap<String, AtomicReference<TimeSeriesRuntimeState>>
+          timeSeries2RuntimeStateMap = 
pipeName2timeSeries2TimeSeriesRuntimeStateMap.remove(pipeName);
+      if (timeSeries2RuntimeStateMap != null) {
+        timeSeries2RuntimeStateMap.clear();
+      }
       pipeName2LastValueReceiveTimeMap.remove(pipeName);
     }
     if (Objects.nonNull(windowingProcessor)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 554c6c43dfb..4030ae580c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -743,6 +743,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, 
regionId));
 
     if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index e8c4420861c..b255e67404e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -601,6 +601,7 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
   @Override
   public synchronized void discardEventsOfPipe(
       final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
     if (Objects.nonNull(tabletBatchBuilder)) {
       tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, 
creationTimeToDrop, regionId);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index 4fc2cfbff9c..1cd49963240 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -203,7 +204,12 @@ public class DataNodeRegionManager {
   public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) {
     try {
       schemaEngine.deleteSchemaRegion(schemaRegionId);
-      PipeDataNodeAgent.runtime().schemaListener(schemaRegionId).close();
+      final SchemaRegionListeningQueue listener =
+          PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegionId);
+      if (listener != null) {
+        listener.close();
+      }
+      PipeDataNodeAgent.runtime().clearSchemaRegionState(schemaRegionId);
       schemaRegionLockMap.remove(schemaRegionId);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully");
     } catch (MetadataException e) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java
new file mode 100644
index 00000000000..3e08d23978b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PipeSchemaRegionListenerManagerTest {
+
+  @Test
+  public void testLeaderStateDoesNotCreateListener() {
+    final PipeSchemaRegionListenerManager manager = new 
PipeSchemaRegionListenerManager();
+    final SchemaRegionId schemaRegionId = new SchemaRegionId(1);
+
+    manager.notifyLeaderReady(schemaRegionId);
+
+    Assert.assertTrue(manager.isLeaderReady(schemaRegionId));
+    Assert.assertTrue(manager.regionIds().isEmpty());
+    Assert.assertNull(manager.listenerIfPresent(schemaRegionId));
+  }
+
+  @Test
+  public void testCleanupUnusedClosedListener() {
+    final PipeSchemaRegionListenerManager manager = new 
PipeSchemaRegionListenerManager();
+    final SchemaRegionId schemaRegionId = new SchemaRegionId(2);
+
+    final SchemaRegionListeningQueue listeningQueue = 
manager.listener(schemaRegionId);
+    Assert.assertEquals(1, 
manager.increaseAndGetReferenceCount(schemaRegionId));
+
+    listeningQueue.open();
+    Assert.assertEquals(0, 
manager.decreaseAndGetReferenceCount(schemaRegionId));
+    Assert.assertNotNull(manager.listenerIfPresent(schemaRegionId));
+
+    listeningQueue.close();
+    manager.cleanupListenerIfUnused(schemaRegionId);
+
+    Assert.assertNull(manager.listenerIfPresent(schemaRegionId));
+    Assert.assertTrue(manager.regionIds().isEmpty());
+  }
+
+  @Test
+  public void testAutoCleanupAfterLastReferenceReleased() {
+    final PipeSchemaRegionListenerManager manager = new 
PipeSchemaRegionListenerManager();
+    final SchemaRegionId schemaRegionId = new SchemaRegionId(3);
+
+    manager.increaseAndGetReferenceCount(schemaRegionId);
+
+    Assert.assertEquals(0, 
manager.decreaseAndGetReferenceCount(schemaRegionId));
+    Assert.assertNull(manager.listenerIfPresent(schemaRegionId));
+    Assert.assertTrue(manager.regionIds().isEmpty());
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index b47441753c4..82ee87f6c8d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -78,6 +78,13 @@ public class PipeEventCommitManager {
     LOGGER.info("Pipe committer deregistered for pipe on region: {}", 
committerKey);
   }
 
+  public void clear(final String pipeName, final long creationTime) {
+    eventCommitterMap.keySet().removeIf(key -> clearIfMatches(key, pipeName, 
creationTime, true));
+    eventCommitterRestartTimesMap
+        .keySet()
+        .removeIf(key -> clearIfMatches(key, pipeName, creationTime, false));
+  }
+
   /**
    * Assign a commit id and a key for commit. Make sure {@code 
EnrichedEvent.pipeName} is set before
    * calling this.
@@ -197,6 +204,20 @@ public class PipeEventCommitManager {
     this.commitRateMarker = commitRateMarker;
   }
 
+  private boolean clearIfMatches(
+      final CommitterKey key,
+      final String pipeName,
+      final long creationTime,
+      final boolean shouldClearMetrics) {
+    if (!Objects.equals(key.getPipeName(), pipeName) || key.getCreationTime() 
!= creationTime) {
+      return false;
+    }
+    if (shouldClearMetrics) {
+      PipeEventCommitMetrics.getInstance().deregister(key.stringify());
+    }
+    return true;
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private PipeEventCommitManager() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index ebb3d90bc8c..a7e603bd9bc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -643,7 +643,7 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
    */
   public synchronized void discardEventsOfPipe(
       final String pipeName, final long creationTime, final int regionId) {
-    // Do nothing by default
+    PIPE_END_POINT_RATE_LIMITER_MAP.remove(new Pair<>(pipeName, creationTime));
   }
 
   public PipeReceiverStatusHandler statusHandler() {

Reply via email to