This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch fix-audit-logger in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 186837f0579073cce17b29e2427d04504e17d724 Author: Caideyipi <[email protected]> AuthorDate: Sun Sep 28 11:46:28 2025 +0800 Pipe: Reduced the conversion logger & Fixed the illegal formats of PipeLogger (#16503) * fix-grass * fix --- .../protocol/thrift/IoTDBDataNodeReceiver.java | 8 ++--- .../statement/PipeConvertedInsertRowStatement.java | 13 ++++--- .../commons/pipe/receiver/IoTDBFileReceiver.java | 40 +++++++++++----------- .../pipe/receiver/PipeReceiverStatusHandler.java | 2 +- 4 files changed, 33 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 1511933ad35..a65ec2b266d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -670,7 +670,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to check authority for statement {}, username = {}, response = {}.", + "Receiver id = %s: Failed to check authority for statement %s, username = %s, response = %s.", receiverId.get(), StatementType.ALTER_LOGICAL_VIEW.name(), username, @@ -822,7 +822,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { } else { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failure status encountered while executing statement {}: {}", + "Receiver id = %s: Failure status encountered while executing statement %s: %s", receiverId.get(), statement, result); @@ -831,7 +831,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Exception encountered while executing statement {}: ", + "Receiver id = %s: Exception encountered while executing statement %s: ", receiverId.get(), statement, e); @@ -890,7 +890,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { if (permissionCheckStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to check authority for statement {}, username = {}, response = {}.", + "Receiver id = %s: Failed to check authority for statement %s, username = %s, response = %s.", receiverId.get(), statement.getType().name(), username, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java index 2484fd18de8..a81e1206cf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.transform.statement; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -91,8 +92,9 @@ public class PipeConvertedInsertRowStatement extends InsertRowStatement { @Override protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { - LOGGER.info( - "Pipe: Inserting row to {}.{}. Casting type from {} to {}.", + PipeLogger.log( + LOGGER::info, + "Pipe: Inserting row to %s.%s. Casting type from %s to %s.", devicePath, measurements[columnIndex], dataTypes[columnIndex], @@ -127,9 +129,10 @@ public class PipeConvertedInsertRowStatement extends InsertRowStatement { try { values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]); } catch (Exception e) { - LOGGER.warn( - "data type of {}.{} is not consistent, " - + "registered type {}, inserting timestamp {}, value {}", + PipeLogger.log( + LOGGER::warn, + "data type of %s.%s is not consistent, " + + "registered type %s, inserting timestamp %s, value %s", devicePath, measurements[i], dataTypes[i], diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index de9204615b0..a0000f2d200 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -143,7 +143,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } catch (Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to delete original receiver file dir {}, because {}.", + "Receiver id = %s: Failed to delete original receiver file dir %s, because %s.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath(), e.getMessage(), @@ -174,14 +174,14 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (Objects.isNull(receiverFileBaseDir)) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to init pipe receiver file folder manager because all disks of folders are full.", + "Receiver id = %s: Failed to init pipe receiver file folder manager because all disks of folders are full.", receiverId.get()); return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT)); } } catch (Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to create pipe receiver file folder because all disks of folders are full.", + "Receiver id = %s: Failed to create pipe receiver file folder because all disks of folders are full.", receiverId.get(), e); return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT)); @@ -234,7 +234,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { "Receiver can not get clusterId from config node."); PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Handshake failed, response status = {}.", + "Receiver id = %s: Handshake failed, response status = %s.", receiverId.get(), status); return new TPipeTransferResp(status); @@ -311,7 +311,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Handshake failed because login failed, response status = {}.", + "Receiver id = %s: Handshake failed because login failed, response status = %s.", receiverId.get(), status); return new TPipeTransferResp(status); @@ -375,7 +375,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (permissionCheckStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to login, username = {}, response = {}.", + "Receiver id = %s: Failed to login, username = %s, response = %s.", receiverId.get(), username, permissionCheckStatus); @@ -432,7 +432,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { PipeLogger.log( LOGGER::warn, e, - "Receiver id = %s: Failed to write file piece from req {}.", + "Receiver id = %s: Failed to write file piece from req %s.", receiverId.get(), req); final TSStatus status = @@ -511,7 +511,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to close current writing file writer {}, because {}.", + "Receiver id = %s: Failed to close current writing file writer %s, because %s.", receiverId.get(), writingFile == null ? "null" : writingFile.getPath(), e.getMessage(), @@ -550,7 +550,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to delete original writing file {}, because {}.", + "Receiver id = %s: Failed to delete original writing file %s, because %s.", receiverId.get(), file.getPath(), e.getMessage(), @@ -571,7 +571,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { if (!offsetCorrect) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Writing file {}'s offset is {}, but request sender's offset is {}.", + "Receiver id = %s: Writing file %s's offset is %s, but request sender's offset is %s.", receiverId.get(), writingFile.getPath(), writingFileWriter.length(), @@ -626,7 +626,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } else { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {}, because {}.", + "Receiver id = %s: Failed to seal file %s, because %s.", receiverId.get(), fileAbsolutePath, status.getMessage()); @@ -635,7 +635,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {} from req {}.", + "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), writingFile, req, @@ -723,7 +723,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } else { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {}, status is {}.", + "Receiver id = %s: Failed to seal file %s, status is %s.", receiverId.get(), fileAbsolutePaths, status); @@ -732,7 +732,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {} from req {}.", + "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), files, req, @@ -761,7 +761,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { String.format("Failed to seal file %s, the file does not exist.", fileName)); PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {}, because the file does not exist.", + "Receiver id = %s: Failed to seal file %s, because the file does not exist.", receiverId.get(), fileName); return new TPipeTransferResp(status); @@ -777,8 +777,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { fileName, fileLength, writingFileWriter.length())); PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {}, because the length of file is not correct. " - + "The original file has length {}, but receiver file has length {}.", + "Receiver id = %s: Failed to seal file %s, because the length of file is not correct. " + + "The original file has length %s, but receiver file has length %s.", receiverId.get(), fileName, fileLength, @@ -799,7 +799,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { "Failed to seal file %s, because writing file is %s.", fileName, writingFile)); PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {}, because writing file is {}.", + "Receiver id = %s: Failed to seal file %s, because writing file is %s.", receiverId.get(), fileName, writingFile); @@ -816,8 +816,8 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { fileName, fileLength, writingFileWriter.length())); PipeLogger.log( LOGGER::warn, - "Receiver id = {}: Failed to seal file {}, because the length of file is not correct. " - + "The original file has length {}, but receiver file has length {}.", + "Receiver id = %s: Failed to seal file %s, because the length of file is not correct. " + + "The original file has length %s, but receiver file has length %s.", receiverId.get(), fileName, fileLength, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index e776645a7d2..8f00e31c28a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -194,7 +194,7 @@ public class PipeReceiverStatusHandler { // Reduce the log if retry forever if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) { - PipeLogger.log(LOGGER::warn, "No permission: will retry forever. status: {}", status); + PipeLogger.log(LOGGER::warn, "No permission: will retry forever. status: %s", status); } else { LOGGER.warn( "No permission: will retry for at least {} seconds. status: {}",
