This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch cp-b616502-dev-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 840961bfaf150769fd07d1d1ad19cf2391d7f6b8 Author: Caideyipi <[email protected]> AuthorDate: Fri May 29 16:13:52 2026 +0800 [To dev/1.3] Pipe Log: Reduce repeatable logs (#17700) Backport the non-i18n log reduction part of b616502aec0b700d0b7f3a1577e9ecee1edc365b. --- .../sink/client/IoTDBDataNodeAsyncClientManager.java | 18 ++++++++++-------- .../PipeConsensusTabletBatchEventHandler.java | 14 +++++++++----- .../PipeConsensusTabletInsertionEventHandler.java | 20 ++++++++++++++------ .../PipeConsensusTsFileInsertionEventHandler.java | 10 ++++++---- .../thrift/async/IoTDBDataRegionAsyncSink.java | 20 +++++++++++--------- .../async/handler/PipeTransferTrackableHandler.java | 4 +++- .../async/handler/PipeTransferTsFileHandler.java | 13 ++++++++----- 7 files changed, 61 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java index 435f890f3a2..e2f6dbec569 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java @@ -193,11 +193,12 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager return client; } } catch (final Exception e) { - LOGGER.warn( - "failed to borrow client {}:{} for cached leader.", + PipeLogger.log( + LOGGER::warn, + e, + "Failed to borrow client %s:%s for cached leader.", endPoint.getIp(), - endPoint.getPort(), - e); + endPoint.getPort()); } return borrowClient(); @@ -342,11 +343,12 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager client.close(); client.invalidateAll(); } catch (final Exception e) { - LOGGER.warn( - "Failed to close client {}:{} after handshake failure when the manager is closed.", + PipeLogger.log( + LOGGER::warn, + e, + "Failed to close client %s:%s after handshake failure when the manager is closed.", targetNodeUrl.getIp(), - targetNodeUrl.getPort(), - e); + targetNodeUrl.getPort()); } } client.setShouldReturnSelf(true); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java index 56d2c9ad062..32481f43bef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferReq; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp; @@ -116,17 +117,20 @@ public class PipeConsensusTabletBatchEventHandler @Override public void onError(final Exception exception) { - LOGGER.warn( - "PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total failed events: {}, related pipe names: {}", - events.size(), + final Object pipeNames = events.stream() .map( event -> event instanceof EnrichedEvent ? ((EnrichedEvent) event).getPipeName() : "UNKNOWN") - .collect(Collectors.toSet()), - exception); + .collect(Collectors.toSet()); + PipeLogger.log( + LOGGER::warn, + exception, + "PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total failed events: %s, related pipe names: %s", + events.size(), + pipeNames); connector.addFailureEventsToRetryQueue(events); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java index 9d027e711f3..91c083fe3c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp; import org.apache.iotdb.db.pipe.consensus.PipeConsensusSinkMetrics; @@ -106,14 +107,21 @@ public abstract class PipeConsensusTabletInsertionEventHandler<E extends TPipeCo @Override public void onError(Exception exception) { - LOGGER.warn( - "Failed to transfer TabletInsertionEvent {} (committer key={}, commit id={}).", + final Object eventReportMessage = event instanceof EnrichedEvent ? ((EnrichedEvent) event).coreReportMessage() - : event.toString(), - event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitterKey() : null, - event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() : null, - exception); + : event.toString(); + final Object committerKey = + event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitterKey() : null; + final Object commitId = + event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() : null; + PipeLogger.log( + LOGGER::warn, + exception, + "Failed to transfer TabletInsertionEvent %s (committer key=%s, commit id=%s).", + eventReportMessage, + committerKey, + commitId); connector.addFailureEventToRetryQueue(event); metric.recordRetryCounter(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java index 22b55239e19..3496c0cf857 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp; import org.apache.iotdb.consensus.pipe.thrift.TCommitId; import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp; @@ -273,12 +274,13 @@ public class PipeConsensusTsFileInsertionEventHandler @Override public void onError(final Exception exception) { - LOGGER.warn( - "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit id {}).", + PipeLogger.log( + LOGGER::warn, + exception, + "Failed to transfer TsFileInsertionEvent %s (committer key %s, commit id %s).", tsFile, event.getCommitterKey(), - event.getCommitId(), - exception); + event.getCommitId()); try { if (reader != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index aaef2ae435f..9d0c1563c78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -258,7 +258,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { false)); } } catch (final Exception e) { - LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, e); + PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch (%s).", sealedFiles); if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { addFailureEventsToRetryQueue(events, e); } @@ -437,17 +437,19 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { } catch (final Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); - LOGGER.warn( - "Transfer tsfile event {} asynchronously was interrupted.", - pipeTransferTsFileHandler.getTsFile(), - e); + PipeLogger.log( + LOGGER::warn, + e, + "Transfer tsfile event %s asynchronously was interrupted.", + pipeTransferTsFileHandler.getTsFile()); } pipeTransferTsFileHandler.onError(e); - LOGGER.warn( - "Failed to transfer tsfile event {} asynchronously.", - pipeTransferTsFileHandler.getTsFile(), - e); + PipeLogger.log( + LOGGER::warn, + e, + "Failed to transfer tsfile event %s asynchronously.", + pipeTransferTsFileHandler.getTsFile()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index a0e6ad73fe7..40b05066a93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; @@ -105,7 +106,8 @@ public abstract class PipeTransferTrackableHandler client.returnSelf( (e) -> { if (e instanceof IllegalStateException) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index c79a09ec239..8d9648f5292 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -157,8 +157,9 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { this.client = client; if (client == null) { - LOGGER.warn( - "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", + PipeLogger.log( + LOGGER::warn, + "Client has been returned to the pool. Current handler status is %s. Will not transfer %s.", sink.isClosed() ? "CLOSED" : "NOT CLOSED", tsFile); return; @@ -420,7 +421,8 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { client.returnSelf( (e) -> { if (e instanceof IllegalStateException) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); return true; } @@ -434,8 +436,9 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { if (client == null) { - LOGGER.warn( - "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", + PipeLogger.log( + LOGGER::warn, + "Client has been returned to the pool. Current handler status is %s. Will not transfer %s.", sink.isClosed() ? "CLOSED" : "NOT CLOSED", tsFile); return;
