This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1c539697c7e Pipe: Setup PipeLogger to reduce the selected pipe logs &
Switched some useless logger for pipe in configNode to debug level (#16296)
1c539697c7e is described below
commit 1c539697c7e56870fb798307ac6b85c5a509f898
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 29 09:39:41 2025 +0800
Pipe: Setup PipeLogger to reduce the selected pipe logs & Switched some
useless logger for pipe in configNode to debug level (#16296)
* test
* fix
* furthur-reduce
* fix
---
.../handlers/rpc/PipeHeartbeatRPCHandler.java | 2 +-
.../runtime/heartbeat/PipeHeartbeatScheduler.java | 2 +-
.../coordinator/task/PipeTaskCoordinatorLock.java | 10 +--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 3 +
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 23 +++---
.../resource/log/PipePeriodicalLogReducer.java | 91 ++++++++++++++++++++++
.../db/pipe/resource/memory/PipeMemoryManager.java | 27 ++++---
.../client/IoTDBDataNodeAsyncClientManager.java | 20 +++--
.../PipeTransferTabletBatchEventHandler.java | 10 ++-
.../PipeTransferTabletInsertionEventHandler.java | 10 ++-
.../async/handler/PipeTransferTsFileHandler.java | 19 +++--
.../apache/iotdb/commons/conf/CommonConfig.java | 83 +++++++++++++-------
.../commons/pipe/agent/task/PipeTaskAgent.java | 19 +++--
.../iotdb/commons/pipe/config/PipeConfig.java | 11 +++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 +++
.../commons/pipe/receiver/IoTDBFileReceiver.java | 3 +-
.../commons/pipe/resource/log/PipeLogger.java | 58 ++++++++++++++
.../pipe/sink/client/IoTDBSyncClientManager.java | 10 ++-
19 files changed, 322 insertions(+), 91 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
index e5fa157961d..ea9b68e7dc0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -48,7 +48,7 @@ public class PipeHeartbeatRPCHandler extends
DataNodeAsyncRequestRPCHandler<TPip
// Put response
responseMap.put(requestId, response);
nodeLocationMap.remove(requestId);
- LOGGER.info("Successfully {} on DataNode: {}", requestType,
formattedTargetLocation);
+ LOGGER.debug("Successfully {} on DataNode: {}", requestType,
formattedTargetLocation);
// Always CountDown
countDownLatch.countDown();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 120ddb65f86..ef9d0f50409 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -92,7 +92,7 @@ public class PipeHeartbeatScheduler {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
final TPipeHeartbeatReq request = new
TPipeHeartbeatReq(System.currentTimeMillis());
- LOGGER.info("Collecting pipe heartbeat {} from data nodes",
request.heartbeatId);
+ LOGGER.debug("Collecting pipe heartbeat {} from data nodes",
request.heartbeatId);
final DataNodeAsyncRequestContext<TPipeHeartbeatReq, TPipeHeartbeatResp>
clientHandler =
new DataNodeAsyncRequestContext<>(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 535cf022836..e57add9a001 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -41,12 +41,12 @@ public class PipeTaskCoordinatorLock {
public void lock() {
try {
final long id = idGenerator.incrementAndGet();
- LOGGER.info(
+ LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) waiting for thread {}",
id,
Thread.currentThread().getName());
deque.put(id);
- LOGGER.info(
+ LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
@@ -61,12 +61,12 @@ public class PipeTaskCoordinatorLock {
public boolean tryLock() {
try {
final long id = idGenerator.incrementAndGet();
- LOGGER.info(
+ LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) waiting for thread {}",
id,
Thread.currentThread().getName());
if (deque.offer(id, 10, TimeUnit.SECONDS)) {
- LOGGER.info(
+ LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
@@ -94,7 +94,7 @@ public class PipeTaskCoordinatorLock {
"PipeTaskCoordinator lock released by thread {} but the lock is not
acquired by any thread",
Thread.currentThread().getName());
} else {
- LOGGER.info(
+ LOGGER.debug(
"PipeTaskCoordinator lock (id: {}) released by thread {}",
id,
Thread.currentThread().getName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 738c8f58ec6..80b0ec98986 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -2405,6 +2406,7 @@ public class IoTDBDescriptor {
private void loadPipeHotModifiedProp(TrimProperties properties) throws
IOException {
PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties,
true);
+ PipePeriodicalLogReducer.update();
}
@SuppressWarnings("squid:S3518") // "proportionSum" can't be zero
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 67621e3033a..0b4a7e46932 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -31,12 +31,14 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
@@ -83,6 +85,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
IoTDBTreePattern.setDevicePathGetter(CompactionPathUtils::getPath);
IoTDBTreePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
+ PipeLogger.setLogger(PipePeriodicalLogReducer::log);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index a2503fec8a7..4d3644aabc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -450,14 +451,15 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
logger.ifPresent(
l ->
- l.info(
- "Reporting pipe meta: {}, isCompleted: {},
remainingEventCount: {}, estimatedRemainingTime: {}",
+ PipeLogger.log(
+ l::info,
+ "Reporting pipe meta: %s, isCompleted: %s,
remainingEventCount: %s",
pipeMeta.coreReportMessage(),
isCompleted,
- remainingEventAndTime.getLeft(),
- remainingEventAndTime.getRight()));
+ remainingEventAndTime.getLeft()));
}
- logger.ifPresent(l -> l.info("Reported {} pipe metas.",
pipeMetaBinaryList.size()));
+ logger.ifPresent(
+ l -> PipeLogger.log(l::info, "Reported %s pipe metas.",
pipeMetaBinaryList.size()));
} catch (final IOException | IllegalPathException e) {
throw new TException(e);
}
@@ -523,14 +525,15 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
logger.ifPresent(
l ->
- l.info(
- "Reporting pipe meta: {}, isCompleted: {},
remainingEventCount: {}, estimatedRemainingTime: {}",
+ PipeLogger.log(
+ l::info,
+ "Reporting pipe meta: %s, isCompleted: %s,
remainingEventCount: %s",
pipeMeta.coreReportMessage(),
isCompleted,
- remainingEventAndTime.getLeft(),
- remainingEventAndTime.getRight()));
+ remainingEventAndTime.getLeft()));
}
- logger.ifPresent(l -> l.info("Reported {} pipe metas.",
pipeMetaBinaryList.size()));
+ logger.ifPresent(
+ l -> PipeLogger.log(l::info, "Reported %s pipe metas.",
pipeMetaBinaryList.size()));
} catch (final IOException | IllegalPathException e) {
throw new TException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
new file mode 100644
index 00000000000..3f5a013320d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.log;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class PipePeriodicalLogReducer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalLogReducer.class);
+ private static final PipeMemoryBlock block;
+ protected static final Cache<String, String> loggerCache;
+
+ static {
+ // Never close because it's static
+ block =
+ PipeDataNodeResourceManager.memory()
+
.tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes());
+ loggerCache =
+ Caffeine.newBuilder()
+ .expireAfterWrite(
+
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
TimeUnit.SECONDS)
+ .weigher(
+ (k, v) ->
+ Math.toIntExact(
+ RamUsageEstimator.sizeOf((String) k)
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
+ .maximumWeight(block.getMemoryUsageInBytes())
+ .build();
+ }
+
+ public static boolean log(
+ final Consumer<String> loggerFunction, final String rawMessage, final
Object... formatter) {
+ final String loggerMessage = String.format(rawMessage, formatter);
+ if (!loggerCache.asMap().containsKey(loggerMessage)) {
+ loggerCache.put(loggerMessage, loggerMessage);
+ loggerFunction.accept(loggerMessage);
+ return true;
+ }
+ return false;
+ }
+
+ public static void update() {
+ loggerCache
+ .policy()
+ .expireAfterWrite()
+ .ifPresent(
+ time ->
+ time.setExpiresAfter(
+
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
+ TimeUnit.SECONDS));
+ PipeDataNodeResourceManager.memory()
+ .resize(block,
PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), false);
+ LOGGER.info(
+ "PipePeriodicalLogReducer is allocated to {} bytes.",
block.getMemoryUsageInBytes());
+ loggerCache
+ .policy()
+ .eviction()
+ .ifPresent(eviction ->
eviction.setMaximum(block.getMemoryUsageInBytes()));
+ }
+
+ private PipePeriodicalLogReducer() {
+ // static
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index aad17113a74..6e43d4bf53a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -299,7 +299,12 @@ public class PipeMemoryManager {
sizeInBytes));
}
- public synchronized void forceResize(PipeMemoryBlock block, long targetSize)
{
+ public void forceResize(final PipeMemoryBlock block, final long targetSize) {
+ resize(block, targetSize, true);
+ }
+
+ public synchronized void resize(
+ final PipeMemoryBlock block, final long targetSize, final boolean force)
{
if (block == null || block.isReleased()) {
LOGGER.warn("forceResize: cannot resize a null or released memory
block");
return;
@@ -360,15 +365,17 @@ public class PipeMemoryManager {
}
}
- throw new PipeRuntimeOutOfMemoryCriticalException(
- String.format(
- "forceResize: failed to allocate memory after %d retries, "
- + "total memory size %d bytes, used memory size %d bytes, "
- + "requested memory size %d bytes",
- memoryAllocateMaxRetries,
- getTotalNonFloatingMemorySizeInBytes(),
- memoryBlock.getUsedMemoryInBytes(),
- sizeInBytes));
+ if (force) {
+ throw new PipeRuntimeOutOfMemoryCriticalException(
+ String.format(
+ "forceResize: failed to allocate memory after %d retries, "
+ + "total memory size %d bytes, used memory size %d bytes, "
+ + "requested memory size %d bytes",
+ memoryAllocateMaxRetries,
+ getTotalNonFloatingMemorySizeInBytes(),
+ memoryBlock.getUsedMemoryInBytes(),
+ sizeInBytes));
+ }
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index a942a8d08cc..56c0316f09e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBClientManager;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -227,8 +228,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
resp.set(response);
if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "Handshake error with receiver {}:{}, code: {}, message:
{}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Handshake error with receiver %s:%s, code: %s, message:
%s.",
targetNodeUrl.getIp(),
targetNodeUrl.getPort(),
response.getStatus().getCode(),
@@ -254,11 +256,12 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
@Override
public void onError(final Exception e) {
- LOGGER.warn(
- "Handshake error with receiver {}:{}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Handshake error with receiver %s:%s.",
targetNodeUrl.getIp(),
- targetNodeUrl.getPort(),
- e);
+ targetNodeUrl.getPort());
exception.set(e);
isHandshakeFinished.set(true);
@@ -296,8 +299,9 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
// Retry to handshake by PipeTransferHandshakeV1Req.
if (resp.get() != null
&& resp.get().getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
- LOGGER.warn(
- "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{}
"
+ PipeLogger.log(
+ LOGGER::warn,
+ "Handshake error by PipeTransferHandshakeV2Req with receiver %s:%s
"
+ "retry to handshake by PipeTransferHandshakeV1Req.",
targetNodeUrl.getIp(),
targetNodeUrl.getPort());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 90ab8bcfa55..89baaa02794 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.db.pipe.sink.util.cacher.LeaderCacheUtils;
@@ -115,11 +116,12 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
@Override
protected void onErrorInternal(final Exception exception) {
try {
- LOGGER.warn(
- "Failed to transfer TabletInsertionEvent batch. Total failed events:
{}, related pipe names: {}",
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer TabletInsertionEvent batch. Total failed events:
%s, related pipe names: %s",
events.size(),
-
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()),
- exception);
+
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
} finally {
connector.addFailureEventsToRetryQueue(events);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 07e11f64624..df26325a5ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -89,12 +90,13 @@ public abstract class
PipeTransferTabletInsertionEventHandler extends PipeTransf
@Override
protected void onErrorInternal(final Exception exception) {
try {
- LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (committer key={},
commit id={}).",
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer TabletInsertionEvent %s (committer key=%s,
commit id=%s).",
event.coreReportMessage(),
event.getCommitterKey(),
- event.getCommitId(),
- exception);
+ event.getCommitId());
} finally {
connector.addFailureEventToRetryQueue(event);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 5868a23fc24..7144bbe5438 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -368,17 +369,19 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
protected void onErrorInternal(final Exception exception) {
try {
if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
- LOGGER.warn(
- "Failed to transfer TsFileInsertionEvent {} (committer key {},
commit id {}).",
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer TsFileInsertionEvent %s (committer key %s,
commit id %s).",
tsFile,
events.stream().map(EnrichedEvent::getCommitterKey).collect(Collectors.toList()),
-
events.stream().map(EnrichedEvent::getCommitIds).collect(Collectors.toList()),
- exception);
+
events.stream().map(EnrichedEvent::getCommitIds).collect(Collectors.toList()));
} else {
- LOGGER.warn(
- "Failed to transfer TsFileInsertionEvent {} (batched
TableInsertionEvents)",
- tsFile,
- exception);
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer TsFileInsertionEvent %s (batched
TableInsertionEvents).",
+ tsFile);
}
} catch (final Exception e) {
LOGGER.warn("Failed to log error when failed to transfer file.", e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 80bd46e3166..61212fbd5a2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -297,34 +297,38 @@ public class CommonConfig {
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
private boolean pipeReceiverLoadConversionEnabled = false;
-
- private double pipeMetaReportMaxLogNumPerRound = 0.1;
- private int pipeMetaReportMaxLogIntervalRounds = 360;
- private int pipeTsFilePinMaxLogNumPerRound = 10;
- private int pipeTsFilePinMaxLogIntervalRounds = 90;
-
- private boolean pipeMemoryManagementEnabled = true;
- private long pipeMemoryAllocateRetryIntervalMs = 50;
- private int pipeMemoryAllocateMaxRetries = 10;
- private long pipeMemoryAllocateMinSizeInBytes = 32;
- private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 *
1024 * 1024; // 2MB
- private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
+ private volatile long pipePeriodicalLogMinIntervalSeconds = 60;
+ private volatile long pipeLoggerCacheMaxSizeInBytes = 10 * MB;
+
+ private volatile double pipeMetaReportMaxLogNumPerRound = 0.1;
+ private volatile int pipeMetaReportMaxLogIntervalRounds = 360;
+ private volatile int pipeTsFilePinMaxLogNumPerRound = 10;
+ private volatile int pipeTsFilePinMaxLogIntervalRounds = 90;
+
+ private volatile boolean pipeMemoryManagementEnabled = true;
+ private volatile long pipeMemoryAllocateRetryIntervalMs = 50;
+ private volatile int pipeMemoryAllocateMaxRetries = 10;
+ private volatile long pipeMemoryAllocateMinSizeInBytes = 32;
+ private volatile long pipeMemoryAllocateForTsFileSequenceReaderInBytes =
+ (long) 2 * 1024 * 1024; // 2MB
+ private volatile long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; //
3Min
private volatile long pipeCheckMemoryEnoughIntervalMs = 10L;
- private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
- private long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 * 1024 *
1024; // 16MB;
- private long pipeListeningQueueTransferSnapshotThreshold = 1000;
- private int pipeSnapshotExecutionMaxBatchSize = 1000;
- private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
- private PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeRateAverage.FIVE_MINUTES;
- private double pipeRemainingInsertNodeCountEMAAlpha = 0.1;
- private double pipeTsFileScanParsingThreshold = 0.05;
- private double pipeDynamicMemoryHistoryWeight = 0.5;
- private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
- private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio =
0.1d;
- private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
- private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
= 0.8d;
- private boolean pipeTransferTsFileSync = false;
- private long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5
minutes
+ private volatile float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+ private volatile long pipeMaxAlignedSeriesChunkSizeInOneBatch = (long) 16 *
1024 * 1024; // 16MB;
+ private volatile long pipeListeningQueueTransferSnapshotThreshold = 1000;
+ private volatile int pipeSnapshotExecutionMaxBatchSize = 1000;
+ private volatile long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
+ private volatile PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
+ PipeRateAverage.FIVE_MINUTES;
+ private volatile double pipeRemainingInsertNodeCountEMAAlpha = 0.1;
+ private volatile double pipeTsFileScanParsingThreshold = 0.05;
+ private volatile double pipeDynamicMemoryHistoryWeight = 0.5;
+ private volatile double pipeDynamicMemoryAdjustmentThreshold = 0.05;
+ private volatile double
pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 0.1d;
+ private volatile double pipeThresholdAllocationStrategyLowUsageThreshold =
0.2d;
+ private volatile double
pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
+ private volatile boolean pipeTransferTsFileSync = false;
+ private volatile long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 *
1000L; // 5 minutes
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -1556,6 +1560,31 @@ public class CommonConfig {
logger.info("pipeReceiverConversionEnabled is set to {}.",
pipeReceiverLoadConversionEnabled);
}
+ public long getPipePeriodicalLogMinIntervalSeconds() {
+ return pipePeriodicalLogMinIntervalSeconds;
+ }
+
+ public void setPipePeriodicalLogMinIntervalSeconds(long
pipePeriodicalLogMinIntervalSeconds) {
+ if (this.pipePeriodicalLogMinIntervalSeconds ==
pipePeriodicalLogMinIntervalSeconds) {
+ return;
+ }
+ this.pipePeriodicalLogMinIntervalSeconds =
pipePeriodicalLogMinIntervalSeconds;
+ logger.info(
+ "pipePeriodicalLogMinIntervalSeconds is set to {}.",
pipePeriodicalLogMinIntervalSeconds);
+ }
+
+ public long getPipeLoggerCacheMaxSizeInBytes() {
+ return pipeLoggerCacheMaxSizeInBytes;
+ }
+
+ public void setPipeLoggerCacheMaxSizeInBytes(long
pipeLoggerCacheMaxSizeInBytes) {
+ if (this.pipeLoggerCacheMaxSizeInBytes == pipeLoggerCacheMaxSizeInBytes) {
+ return;
+ }
+ this.pipeLoggerCacheMaxSizeInBytes = pipeLoggerCacheMaxSizeInBytes;
+ logger.info("pipeLoggerCacheMaxSizeInBytes is set to {}.",
pipeLoggerCacheMaxSizeInBytes);
+ }
+
public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 612704318aa..88ea0941443 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
@@ -395,7 +396,7 @@ public abstract class PipeTaskAgent {
String.format(
"Failed to handle pipe meta changes for %s, because %s",
pipeName, e.getMessage());
- LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName,
e);
+ PipeLogger.log(LOGGER::warn, e, "Failed to handle pipe meta changes
for %s", pipeName);
exceptionMessages.add(
new TPushPipeMetaRespExceptionMessage(
pipeName, errorMessage, System.currentTimeMillis()));
@@ -425,7 +426,7 @@ public abstract class PipeTaskAgent {
final String errorMessage =
String.format(
"Failed to handle pipe meta changes for %s, because %s",
pipeName, e.getMessage());
- LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+ PipeLogger.log(LOGGER::warn, e, "Failed to handle pipe meta changes
for %s", pipeName);
exceptionMessages.add(
new TPushPipeMetaRespExceptionMessage(
pipeName, errorMessage, System.currentTimeMillis()));
@@ -1040,9 +1041,10 @@ public abstract class PipeTaskAgent {
reusedConnectorParameters2ExceptionMap.get(
staticMeta.getSinkParameters());
pipeTaskMeta.trackExceptionMessage(exception);
- LOGGER.warn(
- "Pipe {} (creation time = {}) will be stopped
because of critical exception "
- + "(occurred time {}) in connector {}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Pipe %s (creation time = %s) will be stopped
because of critical exception "
+ + "(occurred time %s) in connector %s.",
staticMeta.getPipeName(),
staticMeta.getCreationTime(),
exception.getTimeStamp(),
@@ -1068,9 +1070,10 @@ public abstract class PipeTaskAgent {
for (final PipeRuntimeException e :
pipeTaskMeta.getExceptionMessages()) {
if (e instanceof PipeRuntimeCriticalException) {
stopPipe(staticMeta.getPipeName(),
staticMeta.getCreationTime());
- LOGGER.warn(
- "Pipe {} (creation time = {}) was stopped
because of critical exception "
- + "(occurred time {}).",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Pipe %s (creation time = %s) was stopped
because of critical exception "
+ + "(occurred time %s).",
staticMeta.getPipeName(),
staticMeta.getCreationTime(),
e.getTimeStamp());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 68f2d59b673..887d89b0279 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -347,6 +347,14 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
}
+ public long getPipePeriodicalLogMinIntervalSeconds() {
+ return COMMON_CONFIG.getPipePeriodicalLogMinIntervalSeconds();
+ }
+
+ public long getPipeLoggerCacheMaxSizeInBytes() {
+ return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes();
+ }
+
/////////////////////////////// Logger ///////////////////////////////
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -580,6 +588,9 @@ public class PipeConfig {
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());
LOGGER.info("PipeReceiverLoadConversionEnabled: {}",
isPipeReceiverLoadConversionEnabled());
+ LOGGER.info(
+ "PipePeriodicalLogMinIntervalSeconds: {}",
getPipePeriodicalLogMinIntervalSeconds());
+ LOGGER.info("PipeLoggerCacheMaxSizeInBytes: {}",
getPipeLoggerCacheMaxSizeInBytes());
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}",
getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}",
getPipeMetaReportMaxLogIntervalRounds());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a60ac57da02..9087db068ed 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -437,6 +437,16 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_receiver_load_conversion_enabled",
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
+ config.setPipePeriodicalLogMinIntervalSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_periodical_log_min_interval_seconds",
+
String.valueOf(config.getPipePeriodicalLogMinIntervalSeconds()))));
+ config.setPipeLoggerCacheMaxSizeInBytes(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_logger_cache_max_size_in_bytes",
+ String.valueOf(config.getPipeLoggerCacheMaxSizeInBytes()))));
config.setPipeMemoryAllocateMaxRetries(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 99f06be4c39..62b1dc26b3d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -108,7 +109,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
+ "connector's timestamp precision %s. Validation
fails.",
CommonDescriptor.getInstance().getConfig().getTimestampPrecision(),
req.getTimestampPrecision()));
- LOGGER.warn("Handshake failed, response status = {}.", status);
+ PipeLogger.log(LOGGER::warn, "Handshake failed, response status = %s.",
status);
return new TPipeTransferResp(status);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
new file mode 100644
index 00000000000..70b494da032
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.resource.log;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.function.Consumer;
+
+public class PipeLogger {
+ private static PipePeriodicalLogger logger =
+ (loggerFunction, rawMessage, formatter) ->
+ loggerFunction.accept(String.format(rawMessage, formatter));
+
+ public static void log(
+ final Consumer<String> loggerFunction, final String rawMessage, final
Object... formatter) {
+ logger.log(loggerFunction, rawMessage, formatter);
+ }
+
+ public static void log(
+ final Consumer<String> loggerFunction,
+ final Throwable throwable,
+ final String rawMessage,
+ final Object... formatter) {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ throwable.printStackTrace(new PrintStream(out));
+ logger.log(loggerFunction, rawMessage + "\n" + out, formatter);
+ }
+
+ public static void setLogger(final PipePeriodicalLogger logger) {
+ PipeLogger.logger = logger;
+ }
+
+ private PipeLogger() {
+ // static
+ }
+
+ @FunctionalInterface
+ public interface PipePeriodicalLogger {
+ void log(final Consumer<String> loggerFunction, final String rawMessage,
final Object... args);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index befc273a364..7cfe8f18712 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV1Req;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV2Req;
@@ -206,12 +207,13 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
return true;
} catch (Exception e) {
endPoint2HandshakeErrorMessage.put(endPoint, e.getMessage());
- LOGGER.warn(
- "Failed to initialize client with target server ip: {}, port: {},
because {}",
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Failed to initialize client with target server ip: %s, port: %s,
because %s",
endPoint.getIp(),
endPoint.getPort(),
- e.getMessage(),
- e);
+ e.getMessage());
return false;
}
}