This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ddeb23b53ce [To dev/1.3] Pipe Log: Reduce repeatable logs (#17700)
(#17793)
ddeb23b53ce is described below
commit ddeb23b53cef249d9b2931da866063e489dd382b
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 1 16:08:55 2026 +0800
[To dev/1.3] Pipe Log: Reduce repeatable logs (#17700) (#17793)
Backport the non-i18n log reduction part of
b616502aec0b700d0b7f3a1577e9ecee1edc365b.
---
.../sink/client/IoTDBDataNodeAsyncClientManager.java | 18 ++++++++++--------
.../PipeConsensusTabletBatchEventHandler.java | 14 +++++++++-----
.../PipeConsensusTabletInsertionEventHandler.java | 20 ++++++++++++++------
.../PipeConsensusTsFileInsertionEventHandler.java | 10 ++++++----
.../thrift/async/IoTDBDataRegionAsyncSink.java | 20 +++++++++++---------
.../async/handler/PipeTransferTrackableHandler.java | 4 +++-
.../async/handler/PipeTransferTsFileHandler.java | 13 ++++++++-----
7 files changed, 61 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 435f890f3a2..e2f6dbec569 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -193,11 +193,12 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
return client;
}
} catch (final Exception e) {
- LOGGER.warn(
- "failed to borrow client {}:{} for cached leader.",
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Failed to borrow client %s:%s for cached leader.",
endPoint.getIp(),
- endPoint.getPort(),
- e);
+ endPoint.getPort());
}
return borrowClient();
@@ -342,11 +343,12 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
client.close();
client.invalidateAll();
} catch (final Exception e) {
- LOGGER.warn(
- "Failed to close client {}:{} after handshake failure when the
manager is closed.",
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Failed to close client %s:%s after handshake failure when the
manager is closed.",
targetNodeUrl.getIp(),
- targetNodeUrl.getPort(),
- e);
+ targetNodeUrl.getPort());
}
}
client.setShouldReturnSelf(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
index 56d2c9ad062..32481f43bef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
@@ -116,17 +117,20 @@ public class PipeConsensusTabletBatchEventHandler
@Override
public void onError(final Exception exception) {
- LOGGER.warn(
- "PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total
failed events: {}, related pipe names: {}",
- events.size(),
+ final Object pipeNames =
events.stream()
.map(
event ->
event instanceof EnrichedEvent
? ((EnrichedEvent) event).getPipeName()
: "UNKNOWN")
- .collect(Collectors.toSet()),
- exception);
+ .collect(Collectors.toSet());
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "PipeConsensus: Failed to transfer TabletInsertionEvent batch. Total
failed events: %s, related pipe names: %s",
+ events.size(),
+ pipeNames);
connector.addFailureEventsToRetryQueue(events);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
index 9d027e711f3..91c083fe3c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusSinkMetrics;
@@ -106,14 +107,21 @@ public abstract class
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
@Override
public void onError(Exception exception) {
- LOGGER.warn(
- "Failed to transfer TabletInsertionEvent {} (committer key={}, commit
id={}).",
+ final Object eventReportMessage =
event instanceof EnrichedEvent
? ((EnrichedEvent) event).coreReportMessage()
- : event.toString(),
- event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null,
- event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId()
: null,
- exception);
+ : event.toString();
+ final Object committerKey =
+ event instanceof EnrichedEvent ? ((EnrichedEvent)
event).getCommitterKey() : null;
+ final Object commitId =
+ event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId()
: null;
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer TabletInsertionEvent %s (committer key=%s, commit
id=%s).",
+ eventReportMessage,
+ committerKey,
+ commitId);
connector.addFailureEventToRetryQueue(event);
metric.recordRetryCounter();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 22b55239e19..3496c0cf857 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
@@ -273,12 +274,13 @@ public class PipeConsensusTsFileInsertionEventHandler
@Override
public void onError(final Exception exception) {
- LOGGER.warn(
- "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit
id {}).",
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ "Failed to transfer TsFileInsertionEvent %s (committer key %s, commit
id %s).",
tsFile,
event.getCommitterKey(),
- event.getCommitId(),
- exception);
+ event.getCommitId());
try {
if (reader != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index aaef2ae435f..9d0c1563c78 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -258,7 +258,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
false));
}
} catch (final Exception e) {
- LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, e);
+ PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch
(%s).", sealedFiles);
if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
addFailureEventsToRetryQueue(events, e);
}
@@ -437,17 +437,19 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
} catch (final Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
- LOGGER.warn(
- "Transfer tsfile event {} asynchronously was interrupted.",
- pipeTransferTsFileHandler.getTsFile(),
- e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Transfer tsfile event %s asynchronously was interrupted.",
+ pipeTransferTsFileHandler.getTsFile());
}
pipeTransferTsFileHandler.onError(e);
- LOGGER.warn(
- "Failed to transfer tsfile event {} asynchronously.",
- pipeTransferTsFileHandler.getTsFile(),
- e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Failed to transfer tsfile event %s asynchronously.",
+ pipeTransferTsFileHandler.getTsFile());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index a0e6ad73fe7..40b05066a93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -105,7 +106,8 @@ public abstract class PipeTransferTrackableHandler
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
"Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index c79a09ec239..8d9648f5292 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -157,8 +157,9 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
this.client = client;
if (client == null) {
- LOGGER.warn(
- "Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Client has been returned to the pool. Current handler status is %s.
Will not transfer %s.",
sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;
@@ -420,7 +421,8 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
"Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
return true;
}
@@ -434,8 +436,9 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
if (client == null) {
- LOGGER.warn(
- "Client has been returned to the pool. Current handler status is {}.
Will not transfer {}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Client has been returned to the pool. Current handler status is %s.
Will not transfer %s.",
sink.isClosed() ? "CLOSED" : "NOT CLOSED",
tsFile);
return;