This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 282147773bd Pipe: Reduced the logs in ConfigNode caused by pipeMeta 
sync and heartbeat report (#12813)
282147773bd is described below

commit 282147773bd1d7a8a6cea3aed5bc5b751b9c4e3b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 26 22:02:18 2024 +0800

    Pipe: Reduced the logs in ConfigNode caused by pipeMeta sync and heartbeat 
report (#12813)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   | 37 ++++++++--------
 .../runtime/heartbeat/PipeHeartbeat.java           |  4 ++
 .../runtime/heartbeat/PipeHeartbeatParser.java     | 39 +++++++++++------
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  |  9 +---
 .../resource/PipeConfigNodeResourceManager.java    | 51 ++++++++++++++++++++++
 .../PipeConfigNodeSnapshotResourceManager.java     | 11 +----
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 16 ++++++-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 10 ++---
 .../IoTDBDataNodeCacheLeaderClientManager.java     |  8 ++--
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  4 +-
 .../PipeConsensusTransferBatchReqBuilder.java      |  4 +-
 .../schema/PipeSchemaRegionSnapshotEvent.java      |  4 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  6 +--
 .../common/tablet/PipeRawTabletInsertionEvent.java |  4 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 12 ++---
 .../tsfile/TsFileInsertionDataContainer.java       |  7 +--
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 11 ++---
 .../PipeRealtimeDataRegionHybridExtractor.java     |  6 +--
 .../realtime/assigner/DisruptorQueue.java          | 12 ++---
 .../iotdb/db/pipe/metric/PipeResourceMetrics.java  | 14 +++---
 .../downsampling/PartialPathLastObjectCache.java   |  4 +-
 ...nager.java => PipeDataNodeResourceManager.java} |  8 ++--
 .../db/pipe/resource/memory/PipeMemoryBlock.java   | 18 ++++----
 .../pipe/resource/tsfile/PipeTsFileResource.java   | 10 +++--
 .../resource/tsfile/PipeTsFileResourceManager.java |  4 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  | 34 +++++++--------
 .../hardlink/PipeWALHardlinkResourceManager.java   | 23 +++++-----
 .../selfhost/PipeWALSelfHostResourceManager.java   |  8 ++--
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  4 +-
 .../event/SubscriptionEventBinaryCache.java        |  8 ++--
 .../PipeDataNodeSnapshotResourceManagerTest.java   | 20 ++++-----
 .../commons}/pipe/resource/log/PipeLogManager.java |  7 ++-
 .../commons}/pipe/resource/log/PipeLogStatus.java  |  6 +--
 33 files changed, 250 insertions(+), 173 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index abfea097dbf..4536789daf6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.PipeTask;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -32,6 +33,7 @@ import 
org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
 import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeRemainingTimeMetrics;
 import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigRegionExtractorMetrics;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTask;
 import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskBuilder;
 import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskStage;
@@ -50,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -57,8 +60,6 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfigNodeTaskAgent.class);
 
-  private final AtomicLong lastLogPrintedTime = new AtomicLong(0);
-
   @Override
   protected boolean isShutdown() {
     return PipeConfigNodeAgent.runtime().isShutdown();
@@ -214,11 +215,13 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final boolean shouldPrintLog =
-          System.currentTimeMillis() - lastLogPrintedTime.get() > 1000 * 60 * 
10; // 10 minutes
-      if (shouldPrintLog) {
-        lastLogPrintedTime.set(System.currentTimeMillis());
-      }
+      final Optional<Logger> logger =
+          PipeConfigNodeResourceManager.log()
+              .schedule(
+                  PipeConfigNodeTaskAgent.class,
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                  pipeMetaKeeper.getPipeMetaCount());
 
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -227,20 +230,20 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
         final long remainingEventCount =
             PipeConfigRegionExtractorMetrics.getInstance()
                 .getRemainingEventCount(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
-        final double remainingTime =
+        final double estimatedRemainingTime =
             PipeConfigNodeRemainingTimeMetrics.getInstance()
                 .getRemainingTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
 
         pipeRemainingEventCountList.add(remainingEventCount);
-        pipeRemainingTimeList.add(remainingTime);
-
-        if (shouldPrintLog) {
-          LOGGER.info(
-              "Reporting pipe meta: {}, remainingEventCount: {}, 
remainingTime: {}",
-              pipeMeta.coreReportMessage(),
-              remainingEventCount,
-              remainingTime);
-        }
+        pipeRemainingTimeList.add(estimatedRemainingTime);
+
+        logger.ifPresent(
+            l ->
+                l.info(
+                    "Reporting pipe meta: {}, remainingEventCount: {}, 
estimatedRemainingTime: {}",
+                    pipeMeta.coreReportMessage(),
+                    remainingEventCount,
+                    estimatedRemainingTime));
       }
       LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (final IOException e) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 4489bf5b957..1f54b8745d5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -63,6 +63,10 @@ public class PipeHeartbeat {
     }
   }
 
+  public int getPipeMetaSize() {
+    return pipeMetaMap.size();
+  }
+
   public PipeMeta getPipeMeta(final PipeStaticMeta pipeStaticMeta) {
     return pipeMetaMap.get(pipeStaticMeta);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 790799cae72..18c964f06c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -30,6 +32,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 
 import org.slf4j.Logger;
@@ -153,7 +156,11 @@ public class PipeHeartbeatParser {
             
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
         
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
         if (uncompletedDataNodeIds.isEmpty()) {
-          
pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName());
+          pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
+          LOGGER.info(
+              "Detected completion of pipe {}, static meta: {}, remove it.",
+              staticMeta.getPipeName(),
+              staticMeta);
           needWriteConsensusOnConfigNodes.set(true);
           needPushPipeMetaToDataNodes.set(true);
           continue;
@@ -193,20 +200,26 @@ public class PipeHeartbeatParser {
                 .getValue()
                 .getProgressIndex()
                 .equals(runtimeMetaFromAgent.getProgressIndex()))) {
-          LOGGER.info(
-              "Updating progress index for (pipe name: {}, consensus group id: 
{}) ... "
-                  + "Progress index on coordinator: {}, progress index from 
agent: {}",
-              pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
-              runtimeMetaFromCoordinator.getKey(),
-              runtimeMetaFromCoordinator.getValue().getProgressIndex(),
-              runtimeMetaFromAgent.getProgressIndex());
-          LOGGER.info(
-              "Progress index for (pipe name: {}, consensus group id: {}) is 
updated to {}",
-              pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
-              runtimeMetaFromCoordinator.getKey(),
+          final ProgressIndex updatedProgressIndex =
               runtimeMetaFromCoordinator
                   .getValue()
-                  
.updateProgressIndex(runtimeMetaFromAgent.getProgressIndex()));
+                  
.updateProgressIndex(runtimeMetaFromAgent.getProgressIndex());
+          PipeConfigNodeResourceManager.log()
+              .schedule(
+                  PipeHeartbeatParser.class,
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                  pipeHeartbeat.getPipeMetaSize())
+              .ifPresent(
+                  l ->
+                      l.info(
+                          "Updated progress index for (pipe name: {}, 
consensus group id: {}) ... "
+                              + "Progress index on coordinator: {}, progress 
index from agent: {}, updated progressIndex: {}",
+                          
pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
+                          runtimeMetaFromCoordinator.getKey(),
+                          
runtimeMetaFromCoordinator.getValue().getProgressIndex(),
+                          runtimeMetaFromAgent.getProgressIndex(),
+                          updatedProgressIndex));
 
           needWriteConsensusOnConfigNodes.set(true);
         }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index 56c9d012c3c..40918e1b6a7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-import 
org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -101,12 +101,7 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern) {
-    super(
-        pipeName,
-        creationTime,
-        pipeTaskMeta,
-        pattern,
-        PipeConfigNodeSnapshotResourceManager.getInstance());
+    super(pipeName, creationTime, pipeTaskMeta, pattern, 
PipeConfigNodeResourceManager.snapshot());
     this.snapshotPath = snapshotPath;
     this.templateFilePath = Objects.nonNull(templateFilePath) ? 
templateFilePath : "";
     this.fileType = type;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
new file mode 100644
index 00000000000..3500d26e3c2
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.resource;
+
+import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager;
+
+public class PipeConfigNodeResourceManager {
+
+  private final PipeSnapshotResourceManager pipeSnapshotResourceManager;
+  private final PipeLogManager pipeLogManager;
+
+  public static PipeSnapshotResourceManager snapshot() {
+    return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE
+        .pipeSnapshotResourceManager;
+  }
+
+  public static PipeLogManager log() {
+    return 
PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager;
+  }
+
+  ///////////////////////////// SINGLETON /////////////////////////////
+
+  private PipeConfigNodeResourceManager() {
+    pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager();
+    pipeLogManager = new PipeLogManager();
+  }
+
+  private static class PipeResourceManagerHolder {
+    private static final PipeConfigNodeResourceManager INSTANCE =
+        new PipeConfigNodeResourceManager();
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
index 4ab5a540940..17769654560 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
@@ -27,16 +27,7 @@ import java.util.HashSet;
 
 public class PipeConfigNodeSnapshotResourceManager extends 
PipeSnapshotResourceManager {
 
-  private PipeConfigNodeSnapshotResourceManager() {
+  public PipeConfigNodeSnapshotResourceManager() {
     super(new 
HashSet<>(Collections.singletonList(IoTDBConstant.CONSENSUS_FOLDER_NAME)));
   }
-
-  private static class PipeConfigNodeSnapshotResourceManagerHolder {
-    private static final PipeConfigNodeSnapshotResourceManager INSTANCE =
-        new PipeConfigNodeSnapshotResourceManager();
-  }
-
-  public static synchronized PipeConfigNodeSnapshotResourceManager 
getInstance() {
-    return PipeConfigNodeSnapshotResourceManagerHolder.INSTANCE;
-  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 843bea6d8e3..5996545461d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -41,6 +42,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePla
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
+import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
@@ -63,6 +65,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -154,7 +157,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void checkBeforeCreatePipeInternal(TCreatePipeReq createPipeRequest)
+  private void checkBeforeCreatePipeInternal(final TCreatePipeReq 
createPipeRequest)
       throws PipeException {
     if (!isPipeExisted(createPipeRequest.getPipeName())) {
       return;
@@ -556,11 +559,20 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
     pipeMetaKeeper.clear();
 
+    // This method is only triggered by pipe sync / meta report currently
+    // And is guaranteed to print log finally
+    final Optional<Logger> logger =
+        PipeConfigNodeResourceManager.log()
+            .schedule(
+                PipeTaskInfo.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
     plan.getPipeMetaList()
         .forEach(
             pipeMeta -> {
               
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
-              LOGGER.info("Recording pipe meta: {}", pipeMeta);
+              logger.ifPresent(l -> l.info("Recording pipe meta: {}", 
pipeMeta));
             });
 
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f1fd633c769..c91c76d36be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -47,7 +47,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInser
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
 import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskBuilder;
@@ -314,7 +314,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
       final Optional<Logger> logger =
-          PipeResourceManager.log()
+          PipeDataNodeResourceManager.log()
               .schedule(
                   PipeDataNodeTaskAgent.class,
                   
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
@@ -393,7 +393,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
       final Optional<Logger> logger =
-          PipeResourceManager.log()
+          PipeDataNodeResourceManager.log()
               .schedule(
                   PipeDataNodeTaskAgent.class,
                   
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
@@ -515,7 +515,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
     try {
       final long linkedButDeletedTsFileSize =
-          PipeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
+          
PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileSize();
       final double totalDisk =
           MetricService.getInstance()
               .getAutoGauge(
@@ -538,7 +538,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   private boolean mayMemTablePinnedCountReachDangerousThreshold() {
-    return PipeResourceManager.wal().getPinnedWalCount()
+    return PipeDataNodeResourceManager.wal().getPinnedWalCount()
         >= 10 * 
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index 965a2a068bf..e45954e7e4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.connector.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 
 import com.github.benmanes.caffeine.cache.Cache;
@@ -51,15 +51,15 @@ public interface IoTDBDataNodeCacheLeaderClientManager {
 
     public LeaderCacheManager() {
       final long initMemorySizeInBytes =
-          PipeResourceManager.memory().getTotalMemorySizeInBytes() / 10;
+          PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 
10;
       final long maxMemorySizeInBytes =
           (long)
-              (PipeResourceManager.memory().getTotalMemorySizeInBytes()
+              (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
                   * CONFIG.getPipeLeaderCacheMemoryUsagePercentage());
 
       // properties required by pipe memory control framework
       final PipeMemoryBlock allocatedMemoryBlock =
-          PipeResourceManager.memory()
+          PipeDataNodeResourceManager.memory()
               .tryAllocate(initMemorySizeInBytes)
               .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
               .setShrinkCallback(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 0b97487b04e..a2d7315ae0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -61,7 +61,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   PipeTabletEventPlainBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
     super(maxDelayInMs);
     this.allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .tryAllocate(requestMaxBatchSizeInBytes)
             .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
             .setShrinkCallback(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index bf116c36798..cdecc272772 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -28,7 +28,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -88,7 +88,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
             CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
 
     allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .tryAllocate(requestMaxBatchSizeInBytes)
             .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
             .setShrinkCallback(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 60bbf95feee..39bd51d5f53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 
@@ -89,7 +89,7 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern) {
-    super(pipeName, creationTime, pipeTaskMeta, pattern, 
PipeResourceManager.snapshot());
+    super(pipeName, creationTime, pipeTaskMeta, pattern, 
PipeDataNodeResourceManager.snapshot());
     this.mTreeSnapshotPath = mTreeSnapshotPath;
     this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ? 
tagLogSnapshotPath : "";
     this.databaseName = databaseName;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index faba8482c4d..de3e39ed412 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -132,7 +132,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     try {
-      PipeResourceManager.wal().pin(walEntryHandler);
+      PipeDataNodeResourceManager.wal().pin(walEntryHandler);
       return true;
     } catch (final Exception e) {
       LOGGER.warn(
@@ -147,7 +147,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     try {
-      PipeResourceManager.wal().unpin(walEntryHandler);
+      PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
       // Release the containers' memory.
       if (dataContainers != null) {
         dataContainers.clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 9769299ae68..05227ce0408 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -109,7 +109,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
-    allocatedMemoryBlock = 
PipeResourceManager.memory().forceAllocateWithRetry(tablet);
+    allocatedMemoryBlock = 
PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet);
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index b7a883712df..05a5a6ff0f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -196,9 +196,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     try {
-      tsFile = PipeResourceManager.tsfile().increaseFileReference(tsFile, 
true, resource);
+      tsFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, 
resource);
       if (isWithMod) {
-        modFile = PipeResourceManager.tsfile().increaseFileReference(modFile, 
false, null);
+        modFile = 
PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, 
null);
       }
       return true;
     } catch (final Exception e) {
@@ -214,9 +214,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     try {
-      PipeResourceManager.tsfile().decreaseFileReference(tsFile);
+      PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile);
       if (isWithMod) {
-        PipeResourceManager.tsfile().decreaseFileReference(modFile);
+        PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
       }
       return true;
     } catch (final Exception e) {
@@ -291,7 +291,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
     try {
       final Map<IDeviceID, Boolean> deviceIsAlignedMap =
-          PipeResourceManager.tsfile()
+          PipeDataNodeResourceManager.tsfile()
               .getDeviceIsAlignedMapFromCache(
                   
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
       final Set<IDeviceID> deviceSet =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6366cf8bb95..4e324bb8afe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -107,7 +107,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
     this.sourceEvent = sourceEvent;
 
     try {
-      final PipeTsFileResourceManager tsFileResourceManager = 
PipeResourceManager.tsfile();
+      final PipeTsFileResourceManager tsFileResourceManager = 
PipeDataNodeResourceManager.tsfile();
       final Map<IDeviceID, List<String>> deviceMeasurementsMap;
 
       // TsFileReader is not thread-safe, so we need to create it here and 
close it later.
@@ -138,7 +138,8 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
         memoryRequiredInBytes +=
             
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
       }
-      allocatedMemoryBlock = 
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+      allocatedMemoryBlock =
+          
PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
 
       // Filter again to get the final deviceMeasurementsMap that exactly 
matches the pattern.
       deviceMeasurementsMapIterator =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 1f36e98bec0..34d331a0b6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -446,7 +446,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
               // Pin the resource, in case the file is removed by compaction 
or anything.
               // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
               try {
-                PipeResourceManager.tsfile().pinTsFileResource(resource, 
shouldTransferModFile);
+                PipeDataNodeResourceManager.tsfile()
+                    .pinTsFileResource(resource, shouldTransferModFile);
               } catch (final IOException e) {
                 LOGGER.warn("Pipe: failed to pin TsFileResource {}", 
resource.getTsFilePath());
               }
@@ -507,7 +508,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     final Set<IDeviceID> deviceSet;
     try {
       final Map<IDeviceID, Boolean> deviceIsAlignedMap =
-          PipeResourceManager.tsfile()
+          PipeDataNodeResourceManager.tsfile()
               .getDeviceIsAlignedMapFromCache(
                   
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
       deviceSet =
@@ -595,7 +596,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
     try {
-      PipeResourceManager.tsfile().unpinTsFileResource(resource);
+      PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
     } catch (final IOException e) {
       LOGGER.warn(
           "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
@@ -627,7 +628,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       pendingQueue.forEach(
           resource -> {
             try {
-              PipeResourceManager.tsfile().unpinTsFileResource(resource);
+              
PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
             } catch (final IOException e) {
               LOGGER.warn(
                   "Pipe {}@{}: failed to unpin TsFileResource after dropping 
pipe, original path: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 9854fa963a2..15a020cd577 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionExtractorMetrics;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -222,7 +222,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean mayMemTablePinnedCountReachDangerousThreshold() {
-    return PipeResourceManager.wal().getPinnedWalCount()
+    return PipeDataNodeResourceManager.wal().getPinnedWalCount()
         >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
   }
 
@@ -240,7 +240,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean mayTsFileLinkedCountReachDangerousThreshold() {
-    return PipeResourceManager.tsfile().getLinkedTsfileCount()
+    return PipeDataNodeResourceManager.tsfile().getLinkedTsfileCount()
         >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
index 3c738a73deb..0a576142a39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
@@ -48,14 +48,14 @@ public class DisruptorQueue {
 
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
 
-  public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
+  public DisruptorQueue(final EventHandler<PipeRealtimeEvent> eventHandler) {
     final PipeConfig config = PipeConfig.getInstance();
     final int ringBufferSize = 
config.getPipeExtractorAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
         config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
 
     allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .tryAllocate(
                 ringBufferSize * ringBufferEntrySizeInBytes, currentSize -> 
currentSize / 2);
 
@@ -80,8 +80,8 @@ public class DisruptorQueue {
     ringBuffer = disruptor.start();
   }
 
-  public void publish(PipeRealtimeEvent event) {
-    EnrichedEvent internalEvent = event.getEvent();
+  public void publish(final PipeRealtimeEvent event) {
+    final EnrichedEvent internalEvent = event.getEvent();
     if (internalEvent instanceof PipeHeartbeatEvent) {
       ((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
     }
@@ -104,7 +104,7 @@ public class DisruptorQueue {
       return event;
     }
 
-    public void setEvent(PipeRealtimeEvent event) {
+    public void setEvent(final PipeRealtimeEvent event) {
       this.event = event;
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
index 3c28e30e437..04fb647a825 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.metric;
 
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
@@ -39,19 +39,19 @@ public class PipeResourceMetrics implements IMetricSet {
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
-  public void bindTo(AbstractMetricService metricService) {
+  public void bindTo(final AbstractMetricService metricService) {
     // pipe memory related
     metricService.createAutoGauge(
         Metric.PIPE_MEM.toString(),
         MetricLevel.IMPORTANT,
-        PipeResourceManager.memory(),
+        PipeDataNodeResourceManager.memory(),
         PipeMemoryManager::getUsedMemorySizeInBytes,
         Tag.NAME.toString(),
         PIPE_USED_MEMORY);
     metricService.createAutoGauge(
         Metric.PIPE_MEM.toString(),
         MetricLevel.IMPORTANT,
-        PipeResourceManager.memory(),
+        PipeDataNodeResourceManager.memory(),
         PipeMemoryManager::getTotalMemorySizeInBytes,
         Tag.NAME.toString(),
         PIPE_TOTAL_MEMORY);
@@ -59,17 +59,17 @@ public class PipeResourceMetrics implements IMetricSet {
     metricService.createAutoGauge(
         Metric.PIPE_PINNED_MEMTABLE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        PipeResourceManager.wal(),
+        PipeDataNodeResourceManager.wal(),
         PipeWALResourceManager::getPinnedWalCount);
     metricService.createAutoGauge(
         Metric.PIPE_LINKED_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        PipeResourceManager.tsfile(),
+        PipeDataNodeResourceManager.tsfile(),
         PipeTsFileResourceManager::getLinkedTsfileCount);
   }
 
   @Override
-  public void unbindFrom(AbstractMetricService metricService) {
+  public void unbindFrom(final AbstractMetricService metricService) {
     // pipe memory related
     metricService.remove(
         MetricType.AUTO_GAUGE, Metric.PIPE_MEM.toString(), 
Tag.NAME.toString(), PIPE_USED_MEMORY);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
index ece3963c6f4..3e423f5c437 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.processor.downsampling;
 
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.utils.MemUtils;
 
@@ -42,7 +42,7 @@ public abstract class PartialPathLastObjectCache<T> 
implements AutoCloseable {
 
   protected PartialPathLastObjectCache(final long memoryLimitInBytes) {
     allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .tryAllocate(memoryLimitInBytes)
             .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
             .setShrinkCallback(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
similarity index 93%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
index 9307b46a549..83901ddcfca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.resource;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
-import org.apache.iotdb.db.pipe.resource.log.PipeLogManager;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import 
org.apache.iotdb.db.pipe.resource.snapshot.PipeDataNodeSnapshotResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -31,7 +31,7 @@ import 
org.apache.iotdb.db.pipe.resource.wal.selfhost.PipeWALSelfHostResourceMan
 
 import java.util.concurrent.atomic.AtomicReference;
 
-public class PipeResourceManager {
+public class PipeDataNodeResourceManager {
 
   private final PipeTsFileResourceManager pipeTsFileResourceManager;
   private final AtomicReference<PipeWALResourceManager> pipeWALResourceManager;
@@ -71,7 +71,7 @@ public class PipeResourceManager {
 
   ///////////////////////////// SINGLETON /////////////////////////////
 
-  private PipeResourceManager() {
+  private PipeDataNodeResourceManager() {
     pipeTsFileResourceManager = new PipeTsFileResourceManager();
     pipeWALResourceManager = new AtomicReference<>();
     pipeSnapshotResourceManager = new PipeDataNodeSnapshotResourceManager();
@@ -80,6 +80,6 @@ public class PipeResourceManager {
   }
 
   private static class PipeResourceManagerHolder {
-    private static final PipeResourceManager INSTANCE = new 
PipeResourceManager();
+    private static final PipeDataNodeResourceManager INSTANCE = new 
PipeDataNodeResourceManager();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 06dfc0f64a8..8ebe90b388c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.resource.memory;
 
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeMemoryBlock.class);
 
-  private final PipeMemoryManager pipeMemoryManager = 
PipeResourceManager.memory();
+  private final PipeMemoryManager pipeMemoryManager = 
PipeDataNodeResourceManager.memory();
 
   private final ReentrantLock lock = new ReentrantLock();
 
@@ -48,7 +48,7 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   private volatile boolean isReleased = false;
 
-  public PipeMemoryBlock(long memoryUsageInBytes) {
+  public PipeMemoryBlock(final long memoryUsageInBytes) {
     this.memoryUsageInBytes.set(memoryUsageInBytes);
   }
 
@@ -56,26 +56,26 @@ public class PipeMemoryBlock implements AutoCloseable {
     return memoryUsageInBytes.get();
   }
 
-  public void setMemoryUsageInBytes(long memoryUsageInBytes) {
+  public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
     this.memoryUsageInBytes.set(memoryUsageInBytes);
   }
 
-  public PipeMemoryBlock setShrinkMethod(LongUnaryOperator shrinkMethod) {
+  public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) 
{
     this.shrinkMethod.set(shrinkMethod);
     return this;
   }
 
-  public PipeMemoryBlock setShrinkCallback(BiConsumer<Long, Long> 
shrinkCallback) {
+  public PipeMemoryBlock setShrinkCallback(final BiConsumer<Long, Long> 
shrinkCallback) {
     this.shrinkCallback.set(shrinkCallback);
     return this;
   }
 
-  public PipeMemoryBlock setExpandMethod(LongUnaryOperator extendMethod) {
+  public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod) 
{
     this.expandMethod.set(extendMethod);
     return this;
   }
 
-  public PipeMemoryBlock setExpandCallback(BiConsumer<Long, Long> 
expandCallback) {
+  public PipeMemoryBlock setExpandCallback(final BiConsumer<Long, Long> 
expandCallback) {
     this.expandCallback.set(expandCallback);
     return this;
   }
@@ -180,7 +180,7 @@ public class PipeMemoryBlock implements AutoCloseable {
             lock.unlock();
           }
         }
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         Thread.currentThread().interrupt();
         LOGGER.warn("Interrupted while waiting for the lock.", e);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 090af91fb3c..95be9281d5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.resource.tsfile;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -67,7 +67,9 @@ public class PipeTsFileResource implements AutoCloseable {
   private Map<String, TSDataType> measurementDataTypeMap = null;
 
   public PipeTsFileResource(
-      File hardlinkOrCopiedFile, boolean isTsFile, TsFileResource 
tsFileResource) {
+      final File hardlinkOrCopiedFile,
+      final boolean isTsFile,
+      final TsFileResource tsFileResource) {
     this.hardlinkOrCopiedFile = hardlinkOrCopiedFile;
     this.isTsFile = isTsFile;
     this.tsFileResource = tsFileResource;
@@ -190,7 +192,7 @@ public class PipeTsFileResource implements AutoCloseable {
     // Only allocate when pipe memory used is less than 50%, because memory 
here
     // is hard to shrink and may consume too much memory.
     allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .forceAllocateIfSufficient(
                 
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(),
                 MEMORY_SUFFICIENT_THRESHOLD);
@@ -226,7 +228,7 @@ public class PipeTsFileResource implements AutoCloseable {
 
     // Allocate again for the cached objects.
     allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .forceAllocateIfSufficient(memoryRequiredInBytes, 
MEMORY_SUFFICIENT_THRESHOLD);
     if (allocatedMemoryBlock == null) {
       LOGGER.info(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 1cf6732786e..e40bafe03b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
@@ -81,7 +81,7 @@ public class PipeTsFileResourceManager {
     final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
         hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
     final Optional<Logger> logger =
-        PipeResourceManager.log()
+        PipeDataNodeResourceManager.log()
             .schedule(
                 PipeTsFileResourceManager.class,
                 PipeConfig.getInstance().getPipeTsFilePinMaxLogNumPerRound(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 06b5d68ef96..9adb976c354 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.resource.wal;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
 import org.slf4j.Logger;
@@ -43,15 +43,15 @@ public abstract class PipeWALResourceManager {
   protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
 
   private static final int SEGMENT_LOCK_COUNT = 32;
-  private final ReentrantLock[] memtableIdSegmentLocks;
+  private final ReentrantLock[] memTableIdSegmentLocks;
 
   protected PipeWALResourceManager() {
-    // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple 
threads
+    // memTableIdToPipeWALResourceMap can be concurrently accessed by multiple 
threads
     memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
 
-    memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
+    memTableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
     for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
-      memtableIdSegmentLocks[i] = new ReentrantLock();
+      memTableIdSegmentLocks[i] = new ReentrantLock();
     }
 
     PipeDataNodeAgent.runtime()
@@ -65,7 +65,7 @@ public abstract class PipeWALResourceManager {
     final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
         memtableIdToPipeWALResourceMap.entrySet().iterator();
     final Optional<Logger> logger =
-        PipeResourceManager.log()
+        PipeDataNodeResourceManager.log()
             .schedule(
                 PipeWALResourceManager.class,
                 PipeConfig.getInstance().getPipeWalPinMaxLogNumPerRound(),
@@ -76,7 +76,7 @@ public abstract class PipeWALResourceManager {
       while (iterator.hasNext()) {
         final Map.Entry<Long, PipeWALResource> entry = iterator.next();
         final ReentrantLock lock =
-            memtableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
+            memTableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
 
         lock.lock();
         try {
@@ -94,7 +94,7 @@ public abstract class PipeWALResourceManager {
           lock.unlock();
         }
       }
-    } catch (ConcurrentModificationException e) {
+    } catch (final ConcurrentModificationException e) {
       LOGGER.error(
           "Concurrent modification issues happened, skipping the WAL in this 
round of ttl check",
           e);
@@ -102,34 +102,34 @@ public abstract class PipeWALResourceManager {
   }
 
   public final void pin(final WALEntryHandler walEntryHandler) throws 
IOException {
-    final long memtableId = walEntryHandler.getMemTableId();
-    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+    final long memTableId = walEntryHandler.getMemTableId();
+    final ReentrantLock lock = memTableIdSegmentLocks[(int) (memTableId % 
SEGMENT_LOCK_COUNT)];
 
     lock.lock();
     try {
-      pinInternal(memtableId, walEntryHandler);
+      pinInternal(memTableId, walEntryHandler);
     } finally {
       lock.unlock();
     }
   }
 
-  protected abstract void pinInternal(long memtableId, WALEntryHandler 
walEntryHandler)
+  protected abstract void pinInternal(final long memTableId, final 
WALEntryHandler walEntryHandler)
       throws IOException;
 
   public final void unpin(final WALEntryHandler walEntryHandler) throws 
IOException {
-    final long memtableId = walEntryHandler.getMemTableId();
-    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+    final long memTableId = walEntryHandler.getMemTableId();
+    final ReentrantLock lock = memTableIdSegmentLocks[(int) (memTableId % 
SEGMENT_LOCK_COUNT)];
 
     lock.lock();
     try {
-      unpinInternal(memtableId, walEntryHandler);
+      unpinInternal(memTableId, walEntryHandler);
     } finally {
       lock.unlock();
     }
   }
 
-  protected abstract void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler)
-      throws IOException;
+  protected abstract void unpinInternal(
+      final long memTableId, final WALEntryHandler walEntryHandler) throws 
IOException;
 
   public int getPinnedWalCount() {
     return Objects.nonNull(memtableIdToPipeWALResourceMap)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
index acd64cf23b4..7570b83fc73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/hardlink/PipeWALHardlinkResourceManager.java
@@ -36,15 +36,15 @@ import java.util.Map;
 public class PipeWALHardlinkResourceManager extends PipeWALResourceManager {
 
   @Override
-  protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler) 
{
+  protected void pinInternal(final long memTableId, final WALEntryHandler 
walEntryHandler) {
     memtableIdToPipeWALResourceMap
-        .computeIfAbsent(memtableId, id -> new 
PipeWALHardlinkResource(walEntryHandler, this))
+        .computeIfAbsent(memTableId, id -> new 
PipeWALHardlinkResource(walEntryHandler, this))
         .pin();
   }
 
   @Override
-  protected void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler) {
-    memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+  protected void unpinInternal(final long memTableId, final WALEntryHandler 
walEntryHandler) {
+    memtableIdToPipeWALResourceMap.get(memTableId).unpin();
   }
 
   //////////////////////////// hardlink related ////////////////////////////
@@ -64,7 +64,7 @@ public class PipeWALHardlinkResourceManager extends 
PipeWALResourceManager {
    * @return the hardlink
    * @throws IOException when create hardlink failed
    */
-  public synchronized File increaseFileReference(File file) throws IOException 
{
+  public synchronized File increaseFileReference(final File file) throws 
IOException {
     // if the file is already a hardlink, just increase reference count and 
return it
     if (increaseReferenceIfExists(file.getPath())) {
       return file;
@@ -83,17 +83,17 @@ public class PipeWALHardlinkResourceManager extends 
PipeWALResourceManager {
     return createHardlink(file, hardlink);
   }
 
-  private boolean increaseReferenceIfExists(String path) {
+  private boolean increaseReferenceIfExists(final String path) {
     hardlinkToReferenceMap.computeIfPresent(path, (k, v) -> v + 1);
     return hardlinkToReferenceMap.containsKey(path);
   }
 
   // TODO: Check me! Make sure the file is not a hardlink.
   // TODO: IF user specify a wal by config, will the method work?
-  private static File getHardlinkInPipeWALDir(File file) throws IOException {
+  private static File getHardlinkInPipeWALDir(final File file) throws 
IOException {
     try {
       return new File(getPipeWALDirPath(file), getRelativeFilePath(file));
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new IOException(
           String.format(
               "failed to get hardlink in pipe dir " + "for file %s, it is not 
a wal",
@@ -128,7 +128,8 @@ public class PipeWALHardlinkResourceManager extends 
PipeWALResourceManager {
     return builder.toString();
   }
 
-  private static File createHardlink(File sourceFile, File hardlink) throws 
IOException {
+  private static File createHardlink(final File sourceFile, final File 
hardlink)
+      throws IOException {
     if (!hardlink.getParentFile().exists() && 
!hardlink.getParentFile().mkdirs()) {
       throw new IOException(
           String.format(
@@ -149,7 +150,7 @@ public class PipeWALHardlinkResourceManager extends 
PipeWALResourceManager {
    * @param hardlink the hardlinked file
    * @throws IOException when delete file failed
    */
-  public synchronized void decreaseFileReference(File hardlink) throws 
IOException {
+  public synchronized void decreaseFileReference(final File hardlink) throws 
IOException {
     final Integer updatedReference =
         hardlinkToReferenceMap.computeIfPresent(
             hardlink.getPath(), (file, reference) -> reference - 1);
@@ -161,7 +162,7 @@ public class PipeWALHardlinkResourceManager extends 
PipeWALResourceManager {
   }
 
   @TestOnly
-  public synchronized int getFileReferenceCount(File hardlink) {
+  public synchronized int getFileReferenceCount(final File hardlink) {
     return hardlinkToReferenceMap.getOrDefault(hardlink.getPath(), 0);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
index 94404eafbed..c7fe0accda2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/selfhost/PipeWALSelfHostResourceManager.java
@@ -25,14 +25,14 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 public class PipeWALSelfHostResourceManager extends PipeWALResourceManager {
 
   @Override
-  protected void pinInternal(long memtableId, WALEntryHandler walEntryHandler) 
{
+  protected void pinInternal(final long memTableId, final WALEntryHandler 
walEntryHandler) {
     memtableIdToPipeWALResourceMap
-        .computeIfAbsent(memtableId, id -> new 
PipeWALSelfHostResource(walEntryHandler))
+        .computeIfAbsent(memTableId, id -> new 
PipeWALSelfHostResource(walEntryHandler))
         .pin();
   }
 
   @Override
-  protected void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler) {
-    memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+  protected void unpinInternal(final long memTableId, final WALEntryHandler 
walEntryHandler) {
+    memtableIdToPipeWALResourceMap.get(memTableId).unpin();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 38e010db7db..777545c738b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.metric.PipeWALInsertNodeCacheMetrics;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -78,7 +78,7 @@ public class WALInsertNodeCache {
                 (double) 2 * CONFIG.getWalFileSizeThresholdInByte(),
                 CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
     allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .tryAllocate(requestedAllocateSize)
             .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
             .setShrinkCallback(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
index 5f7ea9adc5c..b8bf26723c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.subscription.event;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -93,15 +93,15 @@ public class SubscriptionEventBinaryCache {
 
   private SubscriptionEventBinaryCache() {
     final long initMemorySizeInBytes =
-        PipeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
+        PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
     final long maxMemorySizeInBytes =
         (long)
-            (PipeResourceManager.memory().getTotalMemorySizeInBytes()
+            (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
                 * 
PipeConfig.getInstance().getSubscriptionCacheMemoryUsagePercentage());
 
     // properties required by pipe memory control framework
     this.allocatedMemoryBlock =
-        PipeResourceManager.memory()
+        PipeDataNodeResourceManager.memory()
             .tryAllocate(initMemorySizeInBytes)
             .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
             .setShrinkCallback(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
index e56027ca432..cf0971c9bb9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeSnapshotResourceManagerTest.java
@@ -57,7 +57,7 @@ public class PipeDataNodeSnapshotResourceManagerTest {
 
   @Before
   public void setUp() throws Exception {
-    File testDir = new File(ROOT_DIR);
+    final File testDir = new File(ROOT_DIR);
     if (testDir.exists()) {
       FileUtils.deleteFileOrDirectory(testDir);
     }
@@ -70,7 +70,7 @@ public class PipeDataNodeSnapshotResourceManagerTest {
 
   @After
   public void tearDown() throws Exception {
-    File testDir = new File(ROOT_DIR);
+    final File testDir = new File(ROOT_DIR);
     if (testDir.exists()) {
       FileUtils.deleteFileOrDirectory(testDir);
     }
@@ -79,37 +79,37 @@ public class PipeDataNodeSnapshotResourceManagerTest {
   @Test
   public void test() {
     try {
-      PipeResourceManager.snapshot()
+      PipeDataNodeResourceManager.snapshot()
           .increaseSnapshotReference(CONSENSUS_SNAPSHOT_DIR + File.separator + 
FILE);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       Assert.fail();
     }
 
     Assert.assertEquals(
         1,
-        PipeResourceManager.snapshot()
+        PipeDataNodeResourceManager.snapshot()
             .getSnapshotReferenceCount(PIPE_CONSENSUS_SNAPSHOT_DIR + 
File.separator + FILE));
     Assert.assertTrue(new File(PIPE_CONSENSUS_SNAPSHOT_DIR, FILE).exists());
 
-    PipeResourceManager.snapshot()
+    PipeDataNodeResourceManager.snapshot()
         .decreaseSnapshotReference(PIPE_CONSENSUS_SNAPSHOT_DIR + 
File.separator + FILE);
 
     Assert.assertEquals(
         0,
-        PipeResourceManager.snapshot()
+        PipeDataNodeResourceManager.snapshot()
             .getSnapshotReferenceCount(PIPE_CONSENSUS_SNAPSHOT_DIR + 
File.separator + FILE));
     Assert.assertFalse(new File(PIPE_CONSENSUS_SNAPSHOT_DIR, FILE).exists());
 
     try {
-      PipeResourceManager.snapshot()
+      PipeDataNodeResourceManager.snapshot()
           .increaseSnapshotReference(WRONG_SNAPSHOT_DIR + File.separator + 
FILE);
       Assert.fail();
-    } catch (IOException e) {
+    } catch (final IOException e) {
     }
 
     Assert.assertEquals(
         0,
-        PipeResourceManager.snapshot()
+        PipeDataNodeResourceManager.snapshot()
             .getSnapshotReferenceCount(PIPE_WRONG_SNAPSHOT_DIR + 
File.separator + FILE));
     Assert.assertFalse(new File(PIPE_WRONG_SNAPSHOT_DIR, FILE).exists());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
similarity index 88%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index c7f6e445b6a..8e18fcfa546 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource.log;
+package org.apache.iotdb.commons.pipe.resource.log;
 
 import org.slf4j.Logger;
 
@@ -31,7 +31,10 @@ public class PipeLogManager {
       new ConcurrentHashMap<>();
 
   public Optional<Logger> schedule(
-      Class<?> logClass, int maxAverageScale, int maxLogInterval, int scale) {
+      final Class<?> logClass,
+      final int maxAverageScale,
+      final int maxLogInterval,
+      final int scale) {
     return logClass2LogStatusMap
         .computeIfAbsent(
             logClass, k -> new PipeLogStatus(logClass, maxAverageScale, 
maxLogInterval))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
similarity index 87%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
index 67355dcc01a..9348708281f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipeLogStatus.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource.log;
+package org.apache.iotdb.commons.pipe.resource.log;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,14 +33,14 @@ class PipeLogStatus {
   private final int maxLogInterval;
   private final AtomicLong currentRounds = new AtomicLong(0);
 
-  PipeLogStatus(Class<?> logClass, int maxAverageScale, int maxLogInterval) {
+  PipeLogStatus(final Class<?> logClass, final int maxAverageScale, final int 
maxLogInterval) {
     logger = LoggerFactory.getLogger(logClass);
 
     this.maxAverageScale = maxAverageScale;
     this.maxLogInterval = maxLogInterval;
   }
 
-  synchronized Optional<Logger> schedule(int scale) {
+  synchronized Optional<Logger> schedule(final int scale) {
     if (currentRounds.incrementAndGet()
         >= Math.min((int) Math.ceil((double) scale / maxAverageScale), 
maxLogInterval)) {
       currentRounds.set(0);

Reply via email to