This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch receiver-delete-file-failed in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7fc89bbf7d1fa7a3246c7312904af1137fbb7381 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Apr 8 15:45:08 2024 +0800 fix --- .../receiver/protocol/IoTDBConfigNodeReceiver.java | 31 +++++++++++++++------- .../protocol/thrift/IoTDBDataNodeReceiver.java | 20 ++++++++++---- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 28e08e231a8..6f2fb7966db 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -88,13 +88,13 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0); - private final ConfigManager configManager = ConfigNode.getInstance().getConfigManager(); - - private static final PipeConfigPhysicalPlanTSStatusVisitor statusVisitor = + private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR = new PipeConfigPhysicalPlanTSStatusVisitor(); - private static final PipeConfigPhysicalPlanExceptionVisitor exceptionVisitor = + private static final PipeConfigPhysicalPlanExceptionVisitor EXCEPTION_VISITOR = new PipeConfigPhysicalPlanExceptionVisitor(); + private final ConfigManager configManager = ConfigNode.getInstance().getConfigManager(); + @Override public TPipeTransferResp receive(final TPipeTransferReq req) { try { @@ -135,13 +135,16 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { RpcUtils.getStatus( TSStatusCode.PIPE_TYPE_ERROR, String.format("Unsupported PipeRequestType on ConfigNode %s.", rawRequestType)); - LOGGER.warn("Unsupported PipeRequestType on ConfigNode, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: Unsupported PipeRequestType on ConfigNode, response status = {}.", + receiverId.get(), + status); return new TPipeTransferResp(status); } catch (Exception e) { final String error = "Exception encountered while handling pipe transfer request. Root cause: " + e.getMessage(); - LOGGER.warn(error, e); + LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e); return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error)); } } @@ -167,12 +170,20 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { try { result = executePlan(plan); if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn("Failure status encountered while executing plan {}: {}", plan, result); - result = statusVisitor.process(plan, result); + LOGGER.warn( + "Receiver id = {}: Failure status encountered while executing plan {}: {}", + receiverId.get(), + plan, + result); + result = STATUS_VISITOR.process(plan, result); } } catch (Exception e) { - LOGGER.warn("Exception encountered while executing plan {}: ", plan, e); - result = exceptionVisitor.process(plan, e); + LOGGER.warn( + "Receiver id = {}: Exception encountered while executing plan {}: ", + receiverId.get(), + plan, + e); + result = EXCEPTION_VISITOR.process(plan, e); } return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 783069c566a..94f288a0f6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -182,11 +182,14 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { RpcUtils.getStatus( TSStatusCode.PIPE_TYPE_ERROR, String.format("Unknown PipeRequestType %s.", rawRequestType)); - LOGGER.warn("Unknown PipeRequestType, response status = {}.", status); + LOGGER.warn( + "Receiver id = {}: Unknown PipeRequestType, response status = {}.", + receiverId.get(), + status); return new TPipeTransferResp(status); } catch (IOException e) { - String error = String.format("Serialization error during pipe receiving, %s", e); - LOGGER.warn(error); + final String error = String.format("Serialization error during pipe receiving, %s", e); + LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e); return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error)); } } @@ -312,11 +315,18 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { return result; } else { LOGGER.warn( - "Failure status encountered while executing statement {}: {}", statement, result); + "Receiver id = {}: Failure status encountered while executing statement {}: {}", + receiverId.get(), + statement, + result); return statement.accept(statusVisitor, result); } } catch (Exception e) { - LOGGER.warn("Exception encountered while executing statement {}: ", statement, e); + LOGGER.warn( + "Receiver id = {}: Exception encountered while executing statement {}: ", + receiverId.get(), + statement, + e); return statement.accept(exceptionVisitor, e); } }
