This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch receiver-delete-file-failed
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7fc89bbf7d1fa7a3246c7312904af1137fbb7381
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Apr 8 15:45:08 2024 +0800

    fix
---
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 31 +++++++++++++++-------
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 20 ++++++++++----
 2 files changed, 36 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 28e08e231a8..6f2fb7966db 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -88,13 +88,13 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
 
   private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0);
 
-  private final ConfigManager configManager = 
ConfigNode.getInstance().getConfigManager();
-
-  private static final PipeConfigPhysicalPlanTSStatusVisitor statusVisitor =
+  private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR =
       new PipeConfigPhysicalPlanTSStatusVisitor();
-  private static final PipeConfigPhysicalPlanExceptionVisitor exceptionVisitor 
=
+  private static final PipeConfigPhysicalPlanExceptionVisitor 
EXCEPTION_VISITOR =
       new PipeConfigPhysicalPlanExceptionVisitor();
 
+  private final ConfigManager configManager = 
ConfigNode.getInstance().getConfigManager();
+
   @Override
   public TPipeTransferResp receive(final TPipeTransferReq req) {
     try {
@@ -135,13 +135,16 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TYPE_ERROR,
               String.format("Unsupported PipeRequestType on ConfigNode %s.", 
rawRequestType));
-      LOGGER.warn("Unsupported PipeRequestType on ConfigNode, response status 
= {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Unsupported PipeRequestType on ConfigNode, 
response status = {}.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     } catch (Exception e) {
       final String error =
           "Exception encountered while handling pipe transfer request. Root 
cause: "
               + e.getMessage();
-      LOGGER.warn(error, e);
+      LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
     }
   }
@@ -167,12 +170,20 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
     try {
       result = executePlan(plan);
       if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn("Failure status encountered while executing plan {}: {}", 
plan, result);
-        result = statusVisitor.process(plan, result);
+        LOGGER.warn(
+            "Receiver id = {}: Failure status encountered while executing plan 
{}: {}",
+            receiverId.get(),
+            plan,
+            result);
+        result = STATUS_VISITOR.process(plan, result);
       }
     } catch (Exception e) {
-      LOGGER.warn("Exception encountered while executing plan {}: ", plan, e);
-      result = exceptionVisitor.process(plan, e);
+      LOGGER.warn(
+          "Receiver id = {}: Exception encountered while executing plan {}: ",
+          receiverId.get(),
+          plan,
+          e);
+      result = EXCEPTION_VISITOR.process(plan, e);
     }
     return result;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 783069c566a..94f288a0f6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -182,11 +182,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TYPE_ERROR,
               String.format("Unknown PipeRequestType %s.", rawRequestType));
-      LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Unknown PipeRequestType, response status = {}.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     } catch (IOException e) {
-      String error = String.format("Serialization error during pipe receiving, 
%s", e);
-      LOGGER.warn(error);
+      final String error = String.format("Serialization error during pipe 
receiving, %s", e);
+      LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
     }
   }
@@ -312,11 +315,18 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         return result;
       } else {
         LOGGER.warn(
-            "Failure status encountered while executing statement {}: {}", 
statement, result);
+            "Receiver id = {}: Failure status encountered while executing 
statement {}: {}",
+            receiverId.get(),
+            statement,
+            result);
         return statement.accept(statusVisitor, result);
       }
     } catch (Exception e) {
-      LOGGER.warn("Exception encountered while executing statement {}: ", 
statement, e);
+      LOGGER.warn(
+          "Receiver id = {}: Exception encountered while executing statement 
{}: ",
+          receiverId.get(),
+          statement,
+          e);
       return statement.accept(exceptionVisitor, e);
     }
   }

Reply via email to