This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c539697c7e Pipe: Setup PipeLogger to reduce the selected pipe logs & 
Switched some useless logger for pipe in configNode to debug level (#16296)
1c539697c7e is described below

commit 1c539697c7e56870fb798307ac6b85c5a509f898
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 29 09:39:41 2025 +0800

    Pipe: Setup PipeLogger to reduce the selected pipe logs & Switched some 
useless logger for pipe in configNode to debug level (#16296)
    
    * test
    
    * fix
    
    * furthur-reduce
    
    * fix
---
 .../handlers/rpc/PipeHeartbeatRPCHandler.java      |  2 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  2 +-
 .../coordinator/task/PipeTaskCoordinatorLock.java  | 10 +--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  2 +
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    |  3 +
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 23 +++---
 .../resource/log/PipePeriodicalLogReducer.java     | 91 ++++++++++++++++++++++
 .../db/pipe/resource/memory/PipeMemoryManager.java | 27 ++++---
 .../client/IoTDBDataNodeAsyncClientManager.java    | 20 +++--
 .../PipeTransferTabletBatchEventHandler.java       | 10 ++-
 .../PipeTransferTabletInsertionEventHandler.java   | 10 ++-
 .../async/handler/PipeTransferTsFileHandler.java   | 19 +++--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 83 +++++++++++++-------
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 19 +++--
 .../iotdb/commons/pipe/config/PipeConfig.java      | 11 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 10 +++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  3 +-
 .../commons/pipe/resource/log/PipeLogger.java      | 58 ++++++++++++++
 .../pipe/sink/client/IoTDBSyncClientManager.java   | 10 ++-
 19 files changed, 322 insertions(+), 91 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
index e5fa157961d..ea9b68e7dc0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -48,7 +48,7 @@ public class PipeHeartbeatRPCHandler extends 
DataNodeAsyncRequestRPCHandler<TPip
     // Put response
     responseMap.put(requestId, response);
     nodeLocationMap.remove(requestId);
-    LOGGER.info("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);
+    LOGGER.debug("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);
 
     // Always CountDown
     countDownLatch.countDown();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 120ddb65f86..ef9d0f50409 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -92,7 +92,7 @@ public class PipeHeartbeatScheduler {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
     final TPipeHeartbeatReq request = new 
TPipeHeartbeatReq(System.currentTimeMillis());
-    LOGGER.info("Collecting pipe heartbeat {} from data nodes", 
request.heartbeatId);
+    LOGGER.debug("Collecting pipe heartbeat {} from data nodes", 
request.heartbeatId);
 
     final DataNodeAsyncRequestContext<TPipeHeartbeatReq, TPipeHeartbeatResp> 
clientHandler =
         new DataNodeAsyncRequestContext<>(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 535cf022836..e57add9a001 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -41,12 +41,12 @@ public class PipeTaskCoordinatorLock {
   public void lock() {
     try {
       final long id = idGenerator.incrementAndGet();
-      LOGGER.info(
+      LOGGER.debug(
           "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
           id,
           Thread.currentThread().getName());
       deque.put(id);
-      LOGGER.info(
+      LOGGER.debug(
           "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
           id,
           Thread.currentThread().getName());
@@ -61,12 +61,12 @@ public class PipeTaskCoordinatorLock {
   public boolean tryLock() {
     try {
       final long id = idGenerator.incrementAndGet();
-      LOGGER.info(
+      LOGGER.debug(
           "PipeTaskCoordinator lock (id: {}) waiting for thread {}",
           id,
           Thread.currentThread().getName());
       if (deque.offer(id, 10, TimeUnit.SECONDS)) {
-        LOGGER.info(
+        LOGGER.debug(
             "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
             id,
             Thread.currentThread().getName());
@@ -94,7 +94,7 @@ public class PipeTaskCoordinatorLock {
           "PipeTaskCoordinator lock released by thread {} but the lock is not 
acquired by any thread",
           Thread.currentThread().getName());
     } else {
-      LOGGER.info(
+      LOGGER.debug(
           "PipeTaskCoordinator lock (id: {}) released by thread {}",
           id,
           Thread.currentThread().getName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 738c8f58ec6..80b0ec98986 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
 import org.apache.iotdb.consensus.config.PipeConsensusConfig;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
 import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -2405,6 +2406,7 @@ public class IoTDBDescriptor {
 
   private void loadPipeHotModifiedProp(TrimProperties properties) throws 
IOException {
     PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, 
true);
+    PipePeriodicalLogReducer.update();
   }
 
   @SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 67621e3033a..0b4a7e46932 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -31,12 +31,14 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
@@ -83,6 +85,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
 
     IoTDBTreePattern.setDevicePathGetter(CompactionPathUtils::getPath);
     IoTDBTreePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
+    PipeLogger.setLogger(PipePeriodicalLogReducer::log);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index a2503fec8a7..4d3644aabc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -450,14 +451,15 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         logger.ifPresent(
             l ->
-                l.info(
-                    "Reporting pipe meta: {}, isCompleted: {}, 
remainingEventCount: {}, estimatedRemainingTime: {}",
+                PipeLogger.log(
+                    l::info,
+                    "Reporting pipe meta: %s, isCompleted: %s, 
remainingEventCount: %s",
                     pipeMeta.coreReportMessage(),
                     isCompleted,
-                    remainingEventAndTime.getLeft(),
-                    remainingEventAndTime.getRight()));
+                    remainingEventAndTime.getLeft()));
       }
-      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
+      logger.ifPresent(
+          l -> PipeLogger.log(l::info, "Reported %s pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
@@ -523,14 +525,15 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         logger.ifPresent(
             l ->
-                l.info(
-                    "Reporting pipe meta: {}, isCompleted: {}, 
remainingEventCount: {}, estimatedRemainingTime: {}",
+                PipeLogger.log(
+                    l::info,
+                    "Reporting pipe meta: %s, isCompleted: %s, 
remainingEventCount: %s",
                     pipeMeta.coreReportMessage(),
                     isCompleted,
-                    remainingEventAndTime.getLeft(),
-                    remainingEventAndTime.getRight()));
+                    remainingEventAndTime.getLeft()));
       }
-      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
+      logger.ifPresent(
+          l -> PipeLogger.log(l::info, "Reported %s pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
new file mode 100644
index 00000000000..3f5a013320d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.log;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class PipePeriodicalLogReducer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePeriodicalLogReducer.class);
+  private static final PipeMemoryBlock block;
+  protected static final Cache<String, String> loggerCache;
+
+  static {
+    // Never close because it's static
+    block =
+        PipeDataNodeResourceManager.memory()
+            
.tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes());
+    loggerCache =
+        Caffeine.newBuilder()
+            .expireAfterWrite(
+                
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), 
TimeUnit.SECONDS)
+            .weigher(
+                (k, v) ->
+                    Math.toIntExact(
+                        RamUsageEstimator.sizeOf((String) k)
+                            + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
+            .maximumWeight(block.getMemoryUsageInBytes())
+            .build();
+  }
+
+  public static boolean log(
+      final Consumer<String> loggerFunction, final String rawMessage, final 
Object... formatter) {
+    final String loggerMessage = String.format(rawMessage, formatter);
+    if (!loggerCache.asMap().containsKey(loggerMessage)) {
+      loggerCache.put(loggerMessage, loggerMessage);
+      loggerFunction.accept(loggerMessage);
+      return true;
+    }
+    return false;
+  }
+
+  public static void update() {
+    loggerCache
+        .policy()
+        .expireAfterWrite()
+        .ifPresent(
+            time ->
+                time.setExpiresAfter(
+                    
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
+                    TimeUnit.SECONDS));
+    PipeDataNodeResourceManager.memory()
+        .resize(block, 
PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), false);
+    LOGGER.info(
+        "PipePeriodicalLogReducer is allocated to {} bytes.", 
block.getMemoryUsageInBytes());
+    loggerCache
+        .policy()
+        .eviction()
+        .ifPresent(eviction -> 
eviction.setMaximum(block.getMemoryUsageInBytes()));
+  }
+
+  private PipePeriodicalLogReducer() {
+    // static
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index aad17113a74..6e43d4bf53a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -299,7 +299,12 @@ public class PipeMemoryManager {
             sizeInBytes));
   }
 
-  public synchronized void forceResize(PipeMemoryBlock block, long targetSize) 
{
+  public void forceResize(final PipeMemoryBlock block, final long targetSize) {
+    resize(block, targetSize, true);
+  }
+
+  public synchronized void resize(
+      final PipeMemoryBlock block, final long targetSize, final boolean force) 
{
     if (block == null || block.isReleased()) {
       LOGGER.warn("forceResize: cannot resize a null or released memory 
block");
       return;
@@ -360,15 +365,17 @@ public class PipeMemoryManager {
       }
     }
 
-    throw new PipeRuntimeOutOfMemoryCriticalException(
-        String.format(
-            "forceResize: failed to allocate memory after %d retries, "
-                + "total memory size %d bytes, used memory size %d bytes, "
-                + "requested memory size %d bytes",
-            memoryAllocateMaxRetries,
-            getTotalNonFloatingMemorySizeInBytes(),
-            memoryBlock.getUsedMemoryInBytes(),
-            sizeInBytes));
+    if (force) {
+      throw new PipeRuntimeOutOfMemoryCriticalException(
+          String.format(
+              "forceResize: failed to allocate memory after %d retries, "
+                  + "total memory size %d bytes, used memory size %d bytes, "
+                  + "requested memory size %d bytes",
+              memoryAllocateMaxRetries,
+              getTotalNonFloatingMemorySizeInBytes(),
+              memoryBlock.getUsedMemoryInBytes(),
+              sizeInBytes));
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index a942a8d08cc..56c0316f09e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.client.IoTDBClientManager;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -227,8 +228,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             resp.set(response);
 
             if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              LOGGER.warn(
-                  "Handshake error with receiver {}:{}, code: {}, message: 
{}.",
+              PipeLogger.log(
+                  LOGGER::warn,
+                  "Handshake error with receiver %s:%s, code: %s, message: 
%s.",
                   targetNodeUrl.getIp(),
                   targetNodeUrl.getPort(),
                   response.getStatus().getCode(),
@@ -254,11 +256,12 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
           @Override
           public void onError(final Exception e) {
-            LOGGER.warn(
-                "Handshake error with receiver {}:{}.",
+            PipeLogger.log(
+                LOGGER::warn,
+                e,
+                "Handshake error with receiver %s:%s.",
                 targetNodeUrl.getIp(),
-                targetNodeUrl.getPort(),
-                e);
+                targetNodeUrl.getPort());
             exception.set(e);
 
             isHandshakeFinished.set(true);
@@ -296,8 +299,9 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       // Retry to handshake by PipeTransferHandshakeV1Req.
       if (resp.get() != null
           && resp.get().getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
-        LOGGER.warn(
-            "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} 
"
+        PipeLogger.log(
+            LOGGER::warn,
+            "Handshake error by PipeTransferHandshakeV2Req with receiver %s:%s 
"
                 + "retry to handshake by PipeTransferHandshakeV1Req.",
             targetNodeUrl.getIp(),
             targetNodeUrl.getPort());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 90ab8bcfa55..89baaa02794 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.db.pipe.sink.util.cacher.LeaderCacheUtils;
@@ -115,11 +116,12 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
   @Override
   protected void onErrorInternal(final Exception exception) {
     try {
-      LOGGER.warn(
-          "Failed to transfer TabletInsertionEvent batch. Total failed events: 
{}, related pipe names: {}",
+      PipeLogger.log(
+          LOGGER::warn,
+          exception,
+          "Failed to transfer TabletInsertionEvent batch. Total failed events: 
%s, related pipe names: %s",
           events.size(),
-          
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()),
-          exception);
+          
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
     } finally {
       connector.addFailureEventsToRetryQueue(events);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 07e11f64624..df26325a5ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -89,12 +90,13 @@ public abstract class 
PipeTransferTabletInsertionEventHandler extends PipeTransf
   @Override
   protected void onErrorInternal(final Exception exception) {
     try {
-      LOGGER.warn(
-          "Failed to transfer TabletInsertionEvent {} (committer key={}, 
commit id={}).",
+      PipeLogger.log(
+          LOGGER::warn,
+          exception,
+          "Failed to transfer TabletInsertionEvent %s (committer key=%s, 
commit id=%s).",
           event.coreReportMessage(),
           event.getCommitterKey(),
-          event.getCommitId(),
-          exception);
+          event.getCommitId());
     } finally {
       connector.addFailureEventToRetryQueue(event);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 5868a23fc24..7144bbe5438 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.utils.RetryUtils;
@@ -368,17 +369,19 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   protected void onErrorInternal(final Exception exception) {
     try {
       if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
-        LOGGER.warn(
-            "Failed to transfer TsFileInsertionEvent {} (committer key {}, 
commit id {}).",
+        PipeLogger.log(
+            LOGGER::warn,
+            exception,
+            "Failed to transfer TsFileInsertionEvent %s (committer key %s, 
commit id %s).",
             tsFile,
             
events.stream().map(EnrichedEvent::getCommitterKey).collect(Collectors.toList()),
-            
events.stream().map(EnrichedEvent::getCommitIds).collect(Collectors.toList()),
-            exception);
+            
events.stream().map(EnrichedEvent::getCommitIds).collect(Collectors.toList()));
       } else {
-        LOGGER.warn(
-            "Failed to transfer TsFileInsertionEvent {} (batched 
TableInsertionEvents)",
-            tsFile,
-            exception);
+        PipeLogger.log(
+            LOGGER::warn,
+            exception,
+            "Failed to transfer TsFileInsertionEvent %s (batched 
TableInsertionEvents).",
+            tsFile);
       }
     } catch (final Exception e) {
       LOGGER.warn("Failed to log error when failed to transfer file.", e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 80bd46e3166..61212fbd5a2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -297,34 +297,38 @@ public class CommonConfig {
 
   private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
   private boolean pipeReceiverLoadConversionEnabled = false;
-
-  private double pipeMetaReportMaxLogNumPerRound = 0.1;
-  private int pipeMetaReportMaxLogIntervalRounds = 360;
-  private int pipeTsFilePinMaxLogNumPerRound = 10;
-  private int pipeTsFilePinMaxLogIntervalRounds = 90;
-
-  private boolean pipeMemoryManagementEnabled = true;
-  private long pipeMemoryAllocateRetryIntervalMs = 50;
-  private int pipeMemoryAllocateMaxRetries = 10;
-  private long pipeMemoryAllocateMinSizeInBytes = 32;
-  private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 
1024 * 1024; // 2MB
-  private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
+  private volatile long pipePeriodicalLogMinIntervalSeconds = 60;
+  private volatile long pipeLoggerCacheMaxSizeInBytes = 10 * MB;
+
+  private volatile double pipeMetaReportMaxLogNumPerRound = 0.1;
+  private volatile int pipeMetaReportMaxLogIntervalRounds = 360;
+  private volatile int pipeTsFilePinMaxLogNumPerRound = 10;
+  private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
+
+  private volatile boolean pipeMemoryManagementEnabled = true;
+  private volatile long pipeMemoryAllocateRetryIntervalMs = 50;
+  private volatile int pipeMemoryAllocateMaxRetries = 10;
+  private volatile long pipeMemoryAllocateMinSizeInBytes = 32;
+  private volatile long pipeMemoryAllocateForTsFileSequenceReaderInBytes =
+      (long) 2 * 1024 * 1024; // 2MB
+  private volatile long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 
3Min
   private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
-  private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
-  private long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 * 1024 * 
1024; // 16MB;
-  private long pipeListeningQueueTransferSnapshotThreshold = 1000;
-  private int pipeSnapshotExecutionMaxBatchSize = 1000;
-  private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
-  private PipeRateAverage pipeRemainingTimeCommitRateAverageTime = 
PipeRateAverage.FIVE_MINUTES;
-  private double pipeRemainingInsertNodeCountEMAAlpha = 0.1;
-  private double pipeTsFileScanParsingThreshold = 0.05;
-  private double pipeDynamicMemoryHistoryWeight = 0.5;
-  private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
-  private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 
0.1d;
-  private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
-  private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold 
= 0.8d;
-  private boolean pipeTransferTsFileSync = false;
-  private long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5 
minutes
+  private volatile float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+  private volatile long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 * 
1024 * 1024; // 16MB;
+  private volatile long pipeListeningQueueTransferSnapshotThreshold = 1000;
+  private volatile int pipeSnapshotExecutionMaxBatchSize = 1000;
+  private volatile long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
+  private volatile PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
+      PipeRateAverage.FIVE_MINUTES;
+  private volatile double pipeRemainingInsertNodeCountEMAAlpha = 0.1;
+  private volatile double pipeTsFileScanParsingThreshold = 0.05;
+  private volatile double pipeDynamicMemoryHistoryWeight = 0.5;
+  private volatile double pipeDynamicMemoryAdjustmentThreshold = 0.05;
+  private volatile double 
pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 0.1d;
+  private volatile double pipeThresholdAllocationStrategyLowUsageThreshold = 
0.2d;
+  private volatile double 
pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
+  private volatile boolean pipeTransferTsFileSync = false;
+  private volatile long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 
1000L; // 5 minutes
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -1556,6 +1560,31 @@ public class CommonConfig {
     logger.info("pipeReceiverConversionEnabled is set to {}.", 
pipeReceiverLoadConversionEnabled);
   }
 
+  public long getPipePeriodicalLogMinIntervalSeconds() {
+    return pipePeriodicalLogMinIntervalSeconds;
+  }
+
+  public void setPipePeriodicalLogMinIntervalSeconds(long 
pipePeriodicalLogMinIntervalSeconds) {
+    if (this.pipePeriodicalLogMinIntervalSeconds == 
pipePeriodicalLogMinIntervalSeconds) {
+      return;
+    }
+    this.pipePeriodicalLogMinIntervalSeconds = 
pipePeriodicalLogMinIntervalSeconds;
+    logger.info(
+        "pipePeriodicalLogMinIntervalSeconds is set to {}.", 
pipePeriodicalLogMinIntervalSeconds);
+  }
+
+  public long getPipeLoggerCacheMaxSizeInBytes() {
+    return pipeLoggerCacheMaxSizeInBytes;
+  }
+
+  public void setPipeLoggerCacheMaxSizeInBytes(long 
pipeLoggerCacheMaxSizeInBytes) {
+    if (this.pipeLoggerCacheMaxSizeInBytes == pipeLoggerCacheMaxSizeInBytes) {
+      return;
+    }
+    this.pipeLoggerCacheMaxSizeInBytes = pipeLoggerCacheMaxSizeInBytes;
+    logger.info("pipeLoggerCacheMaxSizeInBytes is set to {}.", 
pipeLoggerCacheMaxSizeInBytes);
+  }
+
   public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
     return pipeReceiverReqDecompressedMaxLengthInBytes;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 612704318aa..88ea0941443 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
@@ -395,7 +396,7 @@ public abstract class PipeTaskAgent {
               String.format(
                   "Failed to handle pipe meta changes for %s, because %s",
                   pipeName, e.getMessage());
-          LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, 
e);
+          PipeLogger.log(LOGGER::warn, e, "Failed to handle pipe meta changes 
for %s", pipeName);
           exceptionMessages.add(
               new TPushPipeMetaRespExceptionMessage(
                   pipeName, errorMessage, System.currentTimeMillis()));
@@ -425,7 +426,7 @@ public abstract class PipeTaskAgent {
         final String errorMessage =
             String.format(
                 "Failed to handle pipe meta changes for %s, because %s", 
pipeName, e.getMessage());
-        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+        PipeLogger.log(LOGGER::warn, e, "Failed to handle pipe meta changes 
for %s", pipeName);
         exceptionMessages.add(
             new TPushPipeMetaRespExceptionMessage(
                 pipeName, errorMessage, System.currentTimeMillis()));
@@ -1040,9 +1041,10 @@ public abstract class PipeTaskAgent {
                               reusedConnectorParameters2ExceptionMap.get(
                                   staticMeta.getSinkParameters());
                           pipeTaskMeta.trackExceptionMessage(exception);
-                          LOGGER.warn(
-                              "Pipe {} (creation time = {}) will be stopped 
because of critical exception "
-                                  + "(occurred time {}) in connector {}.",
+                          PipeLogger.log(
+                              LOGGER::warn,
+                              "Pipe %s (creation time = %s) will be stopped 
because of critical exception "
+                                  + "(occurred time %s) in connector %s.",
                               staticMeta.getPipeName(),
                               staticMeta.getCreationTime(),
                               exception.getTimeStamp(),
@@ -1068,9 +1070,10 @@ public abstract class PipeTaskAgent {
                           for (final PipeRuntimeException e : 
pipeTaskMeta.getExceptionMessages()) {
                             if (e instanceof PipeRuntimeCriticalException) {
                               stopPipe(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
-                              LOGGER.warn(
-                                  "Pipe {} (creation time = {}) was stopped 
because of critical exception "
-                                      + "(occurred time {}).",
+                              PipeLogger.log(
+                                  LOGGER::warn,
+                                  "Pipe %s (creation time = %s) was stopped 
because of critical exception "
+                                      + "(occurred time %s).",
                                   staticMeta.getPipeName(),
                                   staticMeta.getCreationTime(),
                                   e.getTimeStamp());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 68f2d59b673..887d89b0279 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -347,6 +347,14 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
   }
 
+  public long getPipePeriodicalLogMinIntervalSeconds() {
+    return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds();
+  }
+
+  public long getPipeLoggerCacheMaxSizeInBytes() {
+    return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+  }
+
   /////////////////////////////// Logger ///////////////////////////////
 
   public double getPipeMetaReportMaxLogNumPerRound() {
@@ -580,6 +588,9 @@ public class PipeConfig {
         "PipeReceiverReqDecompressedMaxLengthInBytes: {}",
         getPipeReceiverReqDecompressedMaxLengthInBytes());
     LOGGER.info("PipeReceiverLoadConversionEnabled: {}", 
isPipeReceiverLoadConversionEnabled());
+    LOGGER.info(
+        "PipePeriodicalLogMinIntervalSeconds: {}", 
getPipePeriodicalLogMinIntervalSeconds());
+    LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}", 
getPipeLoggerCacheMaxSizeInBytes());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a60ac57da02..9087db068ed 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -437,6 +437,16 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_receiver_load_conversion_enabled",
                 
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
+    config.setPipePeriodicalLogMinIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_periodical_log_min_interval_seconds",
+                
String.valueOf(config.getPipePeriodicalLogMinIntervalSeconds()))));
+    config.setPipeLoggerCacheMaxSizeInBytes(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_logger_cache_max_size_in_bytes",
+                String.valueOf(config.getPipeLoggerCacheMaxSizeInBytes()))));
 
     config.setPipeMemoryAllocateMaxRetries(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 99f06be4c39..62b1dc26b3d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -108,7 +109,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                       + "connector's timestamp precision %s. Validation 
fails.",
                   
CommonDescriptor.getInstance().getConfig().getTimestampPrecision(),
                   req.getTimestampPrecision()));
-      LOGGER.warn("Handshake failed, response status = {}.", status);
+      PipeLogger.log(LOGGER::warn, "Handshake failed, response status = %s.", 
status);
       return new TPipeTransferResp(status);
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
new file mode 100644
index 00000000000..70b494da032
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.resource.log;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.function.Consumer;
+
+public class PipeLogger {
+  private static PipePeriodicalLogger logger =
+      (loggerFunction, rawMessage, formatter) ->
+          loggerFunction.accept(String.format(rawMessage, formatter));
+
+  public static void log(
+      final Consumer<String> loggerFunction, final String rawMessage, final 
Object... formatter) {
+    logger.log(loggerFunction, rawMessage, formatter);
+  }
+
+  public static void log(
+      final Consumer<String> loggerFunction,
+      final Throwable throwable,
+      final String rawMessage,
+      final Object... formatter) {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    throwable.printStackTrace(new PrintStream(out));
+    logger.log(loggerFunction, rawMessage + "\n" + out, formatter);
+  }
+
+  public static void setLogger(final PipePeriodicalLogger logger) {
+    PipeLogger.logger = logger;
+  }
+
+  private PipeLogger() {
+    // static
+  }
+
+  @FunctionalInterface
+  public interface PipePeriodicalLogger {
+    void log(final Consumer<String> loggerFunction, final String rawMessage, 
final Object... args);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index befc273a364..7cfe8f18712 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV1Req;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV2Req;
@@ -206,12 +207,13 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       return true;
     } catch (Exception e) {
       endPoint2HandshakeErrorMessage.put(endPoint, e.getMessage());
-      LOGGER.warn(
-          "Failed to initialize client with target server ip: {}, port: {}, 
because {}",
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Failed to initialize client with target server ip: %s, port: %s, 
because %s",
           endPoint.getIp(),
           endPoint.getPort(),
-          e.getMessage(),
-          e);
+          e.getMessage());
       return false;
     }
   }


Reply via email to