This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch potential-leak
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 29804ff25169f05d98f99d3591056b20a17836d8
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 15:33:50 2026 +0800

    leak_fix
---
 .../overview/PipeDataNodeSinglePipeMetrics.java    | 135 ++++++++++---------
 .../overview/PipeTsFileToTabletsMetrics.java       |  18 ++-
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 102 +++++++++++---
 .../commons/pipe/agent/task/PipeTaskAgentTest.java | 149 +++++++++++++++++++++
 4 files changed, 318 insertions(+), 86 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 6535d371a91..b9d90fd8a68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -139,12 +139,14 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
   }
 
   private void removeMetrics(final String pipeID) {
-    removeAutoGauge(pipeID);
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.remove(pipeID);
+    if (Objects.nonNull(operator) && Objects.nonNull(metricService)) {
+      removeAutoGauge(operator);
+    }
   }
 
-  private void removeAutoGauge(final String pipeID) {
-    final PipeDataNodeRemainingEventAndTimeOperator operator =
-        remainingEventAndTimeOperatorMap.get(pipeID);
+  private void removeAutoGauge(final PipeDataNodeRemainingEventAndTimeOperator 
operator) {
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PIPE_DATANODE_REMAINING_EVENT_COUNT.toString(),
@@ -190,14 +192,31 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
         Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(),
         Tag.NAME.toString(),
         operator.getPipeName());
-    remainingEventAndTimeOperatorMap.remove(pipeID);
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
+  private static String generatePipeID(final String pipeName, final long 
creationTime) {
+    return pipeName + "_" + creationTime;
+  }
+
+  private boolean isPipeAlive(final String pipeName, final long creationTime) {
+    return PipeDataNodeAgent.task().getPipeCreationTime(pipeName) == 
creationTime;
+  }
+
+  private PipeDataNodeRemainingEventAndTimeOperator 
getOrCreateOperatorIfPipeAlive(
+      final String pipeName, final long creationTime) {
+    if (!isPipeAlive(pipeName, creationTime)) {
+      return null;
+    }
+    return remainingEventAndTimeOperatorMap.computeIfAbsent(
+        generatePipeID(pipeName, creationTime),
+        key -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+  }
+
   public void register(final IoTDBDataRegionSource source) {
     // The metric is global thus the regionId is omitted
-    final String pipeID = source.getPipeName() + "_" + 
source.getCreationTime();
+    final String pipeID = generatePipeID(source.getPipeName(), 
source.getCreationTime());
     remainingEventAndTimeOperatorMap.computeIfAbsent(
         pipeID,
         k ->
@@ -210,7 +229,7 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
 
   public void register(final IoTDBSchemaRegionSource source) {
     // The metric is global thus the regionId is omitted
-    final String pipeID = source.getPipeName() + "_" + 
source.getCreationTime();
+    final String pipeID = generatePipeID(source.getPipeName(), 
source.getCreationTime());
     remainingEventAndTimeOperatorMap
         .computeIfAbsent(
             pipeID,
@@ -224,19 +243,20 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
   }
 
   public void increaseInsertNodeEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .increaseInsertNodeEventCount();
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        getOrCreateOperatorIfPipeAlive(pipeName, creationTime);
+    if (Objects.nonNull(operator)) {
+      operator.increaseInsertNodeEventCount();
+    }
   }
 
   public void decreaseInsertNodeEventCount(
       final String pipeName, final long creationTime, final long transferTime) 
{
     final PipeDataNodeRemainingEventAndTimeOperator operator =
-        remainingEventAndTimeOperatorMap.computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+        remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, 
creationTime));
+    if (Objects.isNull(operator)) {
+      return;
+    }
 
     operator.decreaseInsertNodeEventCount();
 
@@ -247,46 +267,44 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
 
   public void updateInsertNodeTransferTimer(
       final String pipeName, final long creationTime, final long transferTime) 
{
-    if (transferTime > 0) {
-      remainingEventAndTimeOperatorMap
-          .computeIfAbsent(
-              pipeName + "_" + creationTime,
-              k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-          .getInsertNodeTransferTimer()
-          .update(transferTime, TimeUnit.NANOSECONDS);
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, 
creationTime));
+    if (Objects.nonNull(operator) && transferTime > 0) {
+      operator.getInsertNodeTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
     }
   }
 
   public void increaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .increaseRawTabletEventCount();
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        getOrCreateOperatorIfPipeAlive(pipeName, creationTime);
+    if (Objects.nonNull(operator)) {
+      operator.increaseRawTabletEventCount();
+    }
   }
 
   public void decreaseRawTabletEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .decreaseRawTabletEventCount();
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, 
creationTime));
+    if (Objects.nonNull(operator)) {
+      operator.decreaseRawTabletEventCount();
+    }
   }
 
   public void increaseTsFileEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .increaseTsFileEventCount();
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        getOrCreateOperatorIfPipeAlive(pipeName, creationTime);
+    if (Objects.nonNull(operator)) {
+      operator.increaseTsFileEventCount();
+    }
   }
 
   public void decreaseTsFileEventCount(
       final String pipeName, final long creationTime, final long transferTime) 
{
     final PipeDataNodeRemainingEventAndTimeOperator operator =
-        remainingEventAndTimeOperatorMap.computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
+        remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, 
creationTime));
+    if (Objects.isNull(operator)) {
+      return;
+    }
 
     operator.decreaseTsFileEventCount();
 
@@ -297,30 +315,27 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
 
   public void updateTsFileTransferTimer(
       final String pipeName, final long creationTime, final long transferTime) 
{
-    if (transferTime > 0) {
-      remainingEventAndTimeOperatorMap
-          .computeIfAbsent(
-              pipeName + "_" + creationTime,
-              k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-          .getTsFileTransferTimer()
-          .update(transferTime, TimeUnit.NANOSECONDS);
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, 
creationTime));
+    if (Objects.nonNull(operator) && transferTime > 0) {
+      operator.getTsFileTransferTimer().update(transferTime, 
TimeUnit.NANOSECONDS);
     }
   }
 
   public void increaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .increaseHeartbeatEventCount();
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        getOrCreateOperatorIfPipeAlive(pipeName, creationTime);
+    if (Objects.nonNull(operator)) {
+      operator.increaseHeartbeatEventCount();
+    }
   }
 
   public void decreaseHeartbeatEventCount(final String pipeName, final long 
creationTime) {
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(
-            pipeName + "_" + creationTime,
-            k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime))
-        .decreaseHeartbeatEventCount();
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, 
creationTime));
+    if (Objects.nonNull(operator)) {
+      operator.decreaseHeartbeatEventCount();
+    }
   }
 
   public void thawRate(final String pipeID) {
@@ -350,9 +365,7 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
           pipeID);
       return;
     }
-    if (Objects.nonNull(metricService)) {
-      removeMetrics(pipeID);
-    }
+    removeMetrics(pipeID);
   }
 
   public void markRegionCommit(final String pipeID, final boolean 
isDataRegion) {
@@ -395,7 +408,7 @@ public class PipeDataNodeSinglePipeMetrics implements 
IMetricSet {
       final String pipeName, final long creationTime) {
     final PipeDataNodeRemainingEventAndTimeOperator operator =
         remainingEventAndTimeOperatorMap.computeIfAbsent(
-            pipeName + "_" + creationTime,
+            generatePipeID(pipeName, creationTime),
             k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, 
creationTime));
     return new Pair<>(operator.getRemainingNonHeartbeatEvents(), 
operator.getRemainingTime());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
index f9436377bb3..90260fd17fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -110,39 +110,47 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
   }
 
   private void removeMetrics(final String pipeID) {
+    pipeTimerMap.remove(pipeID);
     metricService.remove(
         MetricType.TIMER,
         Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
         Tag.NAME.toString(),
         pipeID);
-    pipeTimerMap.remove(pipeID);
 
+    pipeRateMap.remove(pipeID);
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
         Tag.NAME.toString(),
         pipeID);
-    pipeRateMap.remove(pipeID);
 
+    pipeTabletCountMap.remove(pipeID);
     metricService.remove(
         MetricType.COUNTER,
         Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
         Tag.NAME.toString(),
         pipeID);
-    pipeTabletCountMap.remove(pipeID);
 
+    pipeTabletMemoryMap.remove(pipeID);
     metricService.remove(
         MetricType.COUNTER,
         Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
         Tag.NAME.toString(),
         pipeID);
-    pipeTabletMemoryMap.remove(pipeID);
 
+    pipeParseFileCountMap.remove(pipeID);
     metricService.remove(
         MetricType.COUNTER,
         Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
         Tag.NAME.toString(),
         pipeID);
+  }
+
+  private void removePipeStats(final String pipeID) {
+    pipeTimerMap.remove(pipeID);
+    pipeRateMap.remove(pipeID);
+    pipeTabletCountMap.remove(pipeID);
+    pipeTabletMemoryMap.remove(pipeID);
     pipeParseFileCountMap.remove(pipeID);
   }
 
@@ -165,6 +173,8 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
     try {
       if (Objects.nonNull(metricService)) {
         removeMetrics(pipeID);
+      } else {
+        removePipeStats(pipeID);
       }
     } finally {
       pipe.remove(pipeID);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 99d45298794..41efbef9c04 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -104,10 +105,14 @@ public abstract class PipeTaskAgent {
 
   protected final PipeMetaKeeper pipeMetaKeeper;
   protected final PipeTaskManager pipeTaskManager;
+  private final Map<String, AtomicLong> 
pipeNameWithCreationTime2FloatingMemoryUsageInByteMap;
+  private final Map<String, Set<Long>> pipeName2CreationTimeSetMap;
 
   protected PipeTaskAgent() {
     pipeMetaKeeper = new PipeMetaKeeper();
     pipeTaskManager = new PipeTaskManager();
+    pipeNameWithCreationTime2FloatingMemoryUsageInByteMap = new 
ConcurrentHashMap<>();
+    pipeName2CreationTimeSetMap = new ConcurrentHashMap<>();
 
     // Help PipeEndPointRateLimiter to check if the pipe is still alive
     PipeEndPointRateLimiter.setTaskAgent(this);
@@ -601,6 +606,7 @@ public abstract class PipeTaskAgent {
 
     // Remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
+    cleanupFloatingMemoryUsageCounterIfNecessary(pipeName, creationTime);
 
     return true;
   }
@@ -639,6 +645,8 @@ public abstract class PipeTaskAgent {
 
     // Remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
+    cleanupFloatingMemoryUsageCounterIfNecessary(
+        pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
 
     return true;
   }
@@ -1170,6 +1178,57 @@ public abstract class PipeTaskAgent {
 
   ///////////////////////// Maintain meta info /////////////////////////
 
+  private static String generatePipeNameWithCreationTime(
+      final String pipeName, final long creationTime) {
+    return pipeName + "_" + creationTime;
+  }
+
+  private AtomicLong getOrCreateFloatingMemoryUsageCounter(
+      final String pipeName, final long creationTime) {
+    pipeName2CreationTimeSetMap
+        .computeIfAbsent(pipeName, key -> ConcurrentHashMap.newKeySet())
+        .add(creationTime);
+    return 
pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.computeIfAbsent(
+        generatePipeNameWithCreationTime(pipeName, creationTime), key -> new 
AtomicLong(0));
+  }
+
+  private void tryCleanupFloatingMemoryUsageCounter(
+      final String pipeName, final long creationTime, final AtomicLong 
counter) {
+    if (counter.get() != 0) {
+      return;
+    }
+
+    final PipeMeta currentPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    if (Objects.nonNull(currentPipeMeta)
+        && currentPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
+      return;
+    }
+
+    final String pipeNameWithCreationTime =
+        generatePipeNameWithCreationTime(pipeName, creationTime);
+    
pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.remove(pipeNameWithCreationTime,
 counter);
+
+    if (!pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.containsKey(
+        pipeNameWithCreationTime)) {
+      pipeName2CreationTimeSetMap.computeIfPresent(
+          pipeName,
+          (key, creationTimes) -> {
+            creationTimes.remove(creationTime);
+            return creationTimes.isEmpty() ? null : creationTimes;
+          });
+    }
+  }
+
+  private void cleanupFloatingMemoryUsageCounterIfNecessary(
+      final String pipeName, final long creationTime) {
+    final AtomicLong counter =
+        pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.get(
+            generatePipeNameWithCreationTime(pipeName, creationTime));
+    if (Objects.nonNull(counter)) {
+      tryCleanupFloatingMemoryUsageCounter(pipeName, creationTime, counter);
+    }
+  }
+
   public long getPipeCreationTime(final String pipeName) {
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime();
@@ -1178,7 +1237,7 @@ public abstract class PipeTaskAgent {
   public String getPipeNameWithCreationTime(final String pipeName, final long 
creationTime) {
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     return pipeMeta == null
-        ? pipeName + "_" + creationTime
+        ? generatePipeNameWithCreationTime(pipeName, creationTime)
         : ((PipeTemporaryMetaInAgent) 
pipeMeta.getTemporaryMeta()).getPipeNameWithCreationTime();
   }
 
@@ -1192,22 +1251,23 @@ public abstract class PipeTaskAgent {
   }
 
   public long getAllFloatingMemoryUsageInByte() {
-    final AtomicLong bytes = new AtomicLong(0);
-    pipeMetaKeeper
-        .getPipeMetaList()
-        .forEach(
-            pipeMeta ->
-                bytes.addAndGet(
-                    ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
-                        .getFloatingMemoryUsageInByte()));
-    return bytes.get();
+    return 
pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.values().stream()
+        .mapToLong(AtomicLong::get)
+        .sum();
   }
 
   public long getFloatingMemoryUsageInByte(final String pipeName) {
-    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
-    return pipeMeta == null
-        ? 0
-        : ((PipeTemporaryMetaInAgent) 
pipeMeta.getTemporaryMeta()).getFloatingMemoryUsageInByte();
+    final Set<Long> creationTimes = pipeName2CreationTimeSetMap.get(pipeName);
+    if (Objects.isNull(creationTimes)) {
+      return 0;
+    }
+
+    return creationTimes.stream()
+        .map(creationTime -> generatePipeNameWithCreationTime(pipeName, 
creationTime))
+        .map(pipeNameWithCreationTime2FloatingMemoryUsageInByteMap::get)
+        .filter(Objects::nonNull)
+        .mapToLong(AtomicLong::get)
+        .sum();
   }
 
   public void addFloatingMemoryUsageInByte(
@@ -1215,18 +1275,18 @@ public abstract class PipeTaskAgent {
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     // To avoid stale pipe before alter
     if (Objects.nonNull(pipeMeta) && 
pipeMeta.getStaticMeta().getCreationTime() == creationTime) {
-      ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
-          .addFloatingMemoryUsageInByte(sizeInByte);
+      getOrCreateFloatingMemoryUsageCounter(pipeName, 
creationTime).addAndGet(sizeInByte);
     }
   }
 
   public void decreaseFloatingMemoryUsageInByte(
       final String pipeName, final long creationTime, final long sizeInByte) {
-    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
-    // To avoid stale pipe before alter
-    if (Objects.nonNull(pipeMeta) && 
pipeMeta.getStaticMeta().getCreationTime() == creationTime) {
-      ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
-          .decreaseFloatingMemoryUsageInByte(sizeInByte);
+    final AtomicLong counter =
+        pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.get(
+            generatePipeNameWithCreationTime(pipeName, creationTime));
+    if (Objects.nonNull(counter)) {
+      counter.addAndGet(-sizeInByte);
+      tryCleanupFloatingMemoryUsageCounter(pipeName, creationTime, counter);
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java
new file mode 100644
index 00000000000..50a485df9d2
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task;
+
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class PipeTaskAgentTest {
+
+  @Test
+  public void testFloatingMemoryUsageSurvivesDropUntilLateRelease() throws 
IllegalPathException {
+    final DummyPipeTaskAgent agent = new DummyPipeTaskAgent();
+
+    agent.createPipeForTest(generatePipeMeta("pipe", 1L));
+    agent.addFloatingMemoryUsageInByte("pipe", 1L, 100L);
+    Assert.assertEquals(100L, agent.getAllFloatingMemoryUsageInByte());
+    Assert.assertEquals(100L, agent.getFloatingMemoryUsageInByte("pipe"));
+
+    agent.dropPipeForTest("pipe", 1L);
+    Assert.assertEquals(100L, agent.getAllFloatingMemoryUsageInByte());
+    Assert.assertEquals(100L, agent.getFloatingMemoryUsageInByte("pipe"));
+
+    agent.createPipeForTest(generatePipeMeta("pipe", 2L));
+    agent.addFloatingMemoryUsageInByte("pipe", 2L, 20L);
+    Assert.assertEquals(120L, agent.getAllFloatingMemoryUsageInByte());
+    Assert.assertEquals(120L, agent.getFloatingMemoryUsageInByte("pipe"));
+
+    agent.decreaseFloatingMemoryUsageInByte("pipe", 1L, 100L);
+    Assert.assertEquals(20L, agent.getAllFloatingMemoryUsageInByte());
+    Assert.assertEquals(20L, agent.getFloatingMemoryUsageInByte("pipe"));
+
+    agent.dropPipeForTest("pipe", 2L);
+    agent.decreaseFloatingMemoryUsageInByte("pipe", 2L, 20L);
+    Assert.assertEquals(0L, agent.getAllFloatingMemoryUsageInByte());
+    Assert.assertEquals(0L, agent.getFloatingMemoryUsageInByte("pipe"));
+  }
+
+  @Test
+  public void testZeroFloatingMemoryUsageCounterIsCleanedAfterDrop() throws 
Exception {
+    final DummyPipeTaskAgent agent = new DummyPipeTaskAgent();
+
+    agent.createPipeForTest(generatePipeMeta("pipe", 1L));
+    agent.addFloatingMemoryUsageInByte("pipe", 1L, 100L);
+    agent.decreaseFloatingMemoryUsageInByte("pipe", 1L, 100L);
+    agent.dropPipeForTest("pipe", 1L);
+
+    Assert.assertTrue(
+        getMapField(agent, 
"pipeNameWithCreationTime2FloatingMemoryUsageInByteMap").isEmpty());
+    Assert.assertTrue(getMapField(agent, 
"pipeName2CreationTimeSetMap").isEmpty());
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Map<?, ?> getMapField(final PipeTaskAgent agent, final String 
fieldName)
+      throws NoSuchFieldException, IllegalAccessException {
+    final Field field = PipeTaskAgent.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return (Map<?, ?>) field.get(agent);
+  }
+
+  private static PipeMeta generatePipeMeta(final String pipeName, final long 
creationTime) {
+    return new PipeMeta(
+        new PipeStaticMeta(
+            pipeName, creationTime, new HashMap<>(), new HashMap<>(), new 
HashMap<>()),
+        new PipeRuntimeMeta(new ConcurrentHashMap<>()));
+  }
+
+  private static class DummyPipeTaskAgent extends PipeTaskAgent {
+
+    private boolean createPipeForTest(final PipeMeta pipeMeta) throws 
IllegalPathException {
+      return createPipe(pipeMeta);
+    }
+
+    private boolean dropPipeForTest(final String pipeName, final long 
creationTime) {
+      return dropPipe(pipeName, creationTime);
+    }
+
+    @Override
+    protected boolean isShutdown() {
+      return false;
+    }
+
+    @Override
+    protected void thawRate(final String pipeName, final long creationTime) {
+      // Do nothing
+    }
+
+    @Override
+    protected void freezeRate(final String pipeName, final long creationTime) {
+      // Do nothing
+    }
+
+    @Override
+    protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta 
pipeMetaFromCoordinator) {
+      return new HashMap<>();
+    }
+
+    @Override
+    protected void createPipeTask(
+        final int consensusGroupId,
+        final PipeStaticMeta pipeStaticMeta,
+        final org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta 
pipeTaskMeta) {
+      // Do nothing
+    }
+
+    @Override
+    protected void collectPipeMetaListInternal(
+        final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException {
+      // Do nothing
+    }
+
+    @Override
+    public void runPipeTasks(
+        final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> 
runSingle) {
+      pipeTasks.forEach(runSingle);
+    }
+  }
+}

Reply via email to