This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8515441b6f3 Pipe: Further reduced the repeating pipe logs (#17366)
8515441b6f3 is described below
commit 8515441b6f3114495d819cef1d2414dc086b501e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 27 11:20:26 2026 +0800
Pipe: Further reduced the repeating pipe logs (#17366)
* co-1
* by
* fix
---
.../statemachine/dataregion/DataExecutionVisitor.java | 14 +++++++-------
.../protocol/thrift/IoTDBDataNodeReceiver.java | 19 +++++++++++--------
.../plan/scheduler/AsyncPlanNodeSender.java | 14 ++++++++------
.../agent/task/subtask/PipeAbstractSinkSubtask.java | 9 +++++----
4 files changed, 31 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index ca813658467..e0184b8595d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -96,21 +96,21 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
}
@Override
- public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion
dataRegion) {
+ public TSStatus visitInsertTablet(final InsertTabletNode node, final
DataRegion dataRegion) {
try {
dataRegion.insertTablet(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
- } catch (OutOfTTLException e) {
- LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
+ } catch (final OutOfTTLException e) {
+ LOGGER.debug("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (WriteProcessRejectException e) {
+ } catch (final WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node,
e.getMessage());
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (WriteProcessException e) {
+ } catch (final WriteProcessException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (BatchProcessException e) {
+ } catch (final BatchProcessException e) {
LOGGER.warn(
"Batch failure in executing a InsertTabletNode. device: {},
startTime: {}, measurements: {}, failing status: {}",
node.getTargetPath(),
@@ -119,7 +119,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
e.getFailingStatus());
// For each error
TSStatus firstStatus = null;
- for (TSStatus status : e.getFailingStatus()) {
+ for (final TSStatus status : e.getFailingStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
firstStatus = status;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 867e2487a50..254928e9191 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -830,16 +830,19 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final TSStatus result =
executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(statement);
- if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ final int code = result.getCode();
+ if (code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || code == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return result;
} else {
- PipeLogger.log(
- LOGGER::warn,
- "Receiver id = %s: Failure status encountered while executing
statement %s: %s",
- receiverId.get(),
- statement.getPipeLoggingString(),
- result);
+ if (code != TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: Failure status encountered while executing
statement %s: %s",
+ receiverId.get(),
+ statement.getPipeLoggingString(),
+ result);
+ }
return STATEMENT_STATUS_VISITOR.process(statement, result);
}
} catch (final Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index fc1e3d049d9..1bab463050e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -128,12 +128,14 @@ public class AsyncPlanNodeSender {
RpcUtils.getStatus(
TSStatusCode.WRITE_PROCESS_ERROR,
entry.getValue().getMessage())));
} else {
- LOGGER.warn(
- "dispatch write failed. status: {}, code: {}, message: {}, node
{}",
- entry.getValue().status,
- TSStatusCode.representOf(status.code),
- entry.getValue().message,
-
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+ if (status.code != TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+ LOGGER.warn(
+ "dispatch write failed. status: {}, code: {}, message: {},
node {}",
+ entry.getValue().status,
+ TSStatusCode.representOf(status.code),
+ entry.getValue().message,
+
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+ }
failureFragmentInstanceWithStatusList.add(
new FailedFragmentInstanceWithStatus(instance, status));
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index ccf16bfa753..cb4bbe47c98 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -163,10 +163,11 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
* @return {@code true} if the {@link PipeSubtask} should be stopped, {@code
false} otherwise
*/
private boolean onPipeConnectionException(final Throwable throwable) {
- LOGGER.warn(
- "PipeConnectionException occurred, {} retries to handshake with the
target system.",
- outputPipeSink.getClass().getName(),
- throwable);
+ PipeLogger.log(
+ LOGGER::warn,
+ throwable,
+ "PipeConnectionException occurred, %s retries to handshake with the
target system.",
+ outputPipeSink.getClass().getName());
int retry = 0;
while (retry < MAX_RETRY_TIMES) {