This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d8090da0a0 Improve the high availability of IoTDB
2d8090da0a0 is described below

commit 2d8090da0a07dec97d3feb51e800cf281abcfdc4
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Oct 26 20:01:03 2023 +0800

    Improve the high availability of IoTDB
---
 .../iot/client/AsyncIoTConsensusServiceClient.java |  6 +-
 .../consensus/iot/client/DispatchLogHandler.java   | 18 +++--
 .../metadata/SeriesOverflowException.java          |  5 +-
 .../plan/planner/plan/FragmentInstance.java        | 11 +++
 .../plan/scheduler/AsyncPlanNodeSender.java        | 83 +++++++++++++++-------
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   | 27 ++++++-
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 32 ++++++++-
 .../apache/iotdb/commons/client/ThriftClient.java  | 13 +++-
 .../async/AsyncConfigNodeIServiceClient.java       |  6 +-
 .../async/AsyncDataNodeInternalServiceClient.java  |  6 +-
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |  6 +-
 .../async/AsyncPipeDataTransferServiceClient.java  |  6 +-
 12 files changed, 174 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index ed32300dd58..f2b77a499ec 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.async.TAsyncClientManager;
@@ -110,7 +111,10 @@ public class AsyncIoTConsensusServiceClient extends 
IoTConsensusIService.AsyncCl
       checkReady();
       return true;
     } catch (Exception e) {
-      logger.info("Unexpected exception occurs in {} :", this, e);
+      logger.info(
+          "Unexpected exception occurs in {}, error msg is {}",
+          this,
+          ExceptionUtils.getRootCause(e).toString());
       return false;
     }
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index b851b29981b..a10d031e91f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +70,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
   }
 
-  private boolean needRetry(int statusCode) {
+  public static boolean needRetry(int statusCode) {
     return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
         || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
         || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
@@ -77,12 +78,15 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
 
   @Override
   public void onError(Exception exception) {
-    logger.warn(
-        "Can not send {} to peer for {} times {} because {}",
-        batch,
-        thread.getPeer(),
-        ++retryCount,
-        exception);
+    ++retryCount;
+    if (logger.isWarnEnabled()) {
+      logger.warn(
+          "Can not send {} to peer for {} times {} because {}",
+          batch,
+          thread.getPeer(),
+          retryCount,
+          ExceptionUtils.getRootCause(exception).toString());
+    }
     sleepCorrespondingTimeAndRetryAsynchronous();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
index f8def6004cf..72dd6b75cce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
@@ -28,10 +28,7 @@ public class SeriesOverflowException extends 
MetadataException {
   public SeriesOverflowException(long memoryUsage, long seriesNum) {
     super(
         String.format(
-            "There are too many timeseries in memory. "
-                + "Current memory usage is %s and series num is %s. "
-                + "Please increase MAX_HEAP_SIZE in datanode-env.sh/bat, "
-                + "restart and create timeseries again.",
+            "Too many timeseries in memory without device template(current 
memory: %s, series num: %s). To optimize memory, DEVICE TEMPLATE is more 
recommended when devices have same time series.",
             memoryUsage, seriesNum),
         TSStatusCode.SERIES_OVERFLOW.getStatusCode());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 9a211c8384a..fcef7b54a5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -74,6 +74,9 @@ public class FragmentInstance implements IConsensusRequest {
 
   private boolean isHighestPriority;
 
+  // indicate which index we are retrying
+  private transient int nextRetryIndex = 0;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -271,6 +274,14 @@ public class FragmentInstance implements IConsensusRequest 
{
     return hostDataNode;
   }
 
+  public TDataNodeLocation getNextRetriedHostDataNode() {
+    nextRetryIndex =
+        (nextRetryIndex + 1) % 
executorType.getRegionReplicaSet().getDataNodeLocations().size();
+    this.hostDataNode =
+        
executorType.getRegionReplicaSet().getDataNodeLocations().get(nextRetryIndex);
+    return hostDataNode;
+  }
+
   public long getTimeOut() {
     return timeOut;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index 525397123ba..c004356a5f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -35,15 +35,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static com.google.common.util.concurrent.Futures.immediateFuture;
-
 public class AsyncPlanNodeSender {
 
   private static final Logger logger = 
LoggerFactory.getLogger(AsyncPlanNodeSender.class);
@@ -53,8 +51,11 @@ public class AsyncPlanNodeSender {
 
   private final Map<TEndPoint, BatchRequestWithIndex> batchRequests;
   private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
+
+  private final List<Integer> needRetryInstanceIndex;
+
   private final AtomicLong pendingNumber;
-  private final long startSendTime;
+  private long startSendTime;
 
   public AsyncPlanNodeSender(
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -77,6 +78,7 @@ public class AsyncPlanNodeSender {
                   instances.get(i).getRegionReplicaSet().getRegionId()));
     }
     this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size() + 1, 1);
+    this.needRetryInstanceIndex = Collections.synchronizedList(new 
ArrayList<>());
     this.pendingNumber = new AtomicLong(batchRequests.keySet().size());
   }
 
@@ -84,7 +86,11 @@ public class AsyncPlanNodeSender {
     for (Map.Entry<TEndPoint, BatchRequestWithIndex> entry : 
batchRequests.entrySet()) {
       AsyncSendPlanNodeHandler handler =
           new AsyncSendPlanNodeHandler(
-              entry.getValue().getIndexes(), pendingNumber, 
instanceId2RespMap, startSendTime);
+              entry.getValue().getIndexes(),
+              pendingNumber,
+              instanceId2RespMap,
+              needRetryInstanceIndex,
+              startSendTime);
       try {
         AsyncDataNodeInternalServiceClient client =
             asyncInternalServiceClientManager.borrowClient(entry.getKey());
@@ -135,26 +141,55 @@ public class AsyncPlanNodeSender {
     return failureStatusList;
   }
 
-  public Future<FragInstanceDispatchResult> getResult() {
-    for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry : 
instanceId2RespMap.entrySet()) {
-      if (!entry.getValue().accepted) {
-        logger.warn(
-            "dispatch write failed. status: {}, code: {}, message: {}, node 
{}",
-            entry.getValue().status,
-            TSStatusCode.representOf(entry.getValue().status.code),
-            entry.getValue().message,
-            
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
-        if (entry.getValue().getStatus() == null) {
-          return immediateFuture(
-              new FragInstanceDispatchResult(
-                  RpcUtils.getStatus(
-                      TSStatusCode.WRITE_PROCESS_ERROR, 
entry.getValue().getMessage())));
-        } else {
-          return immediateFuture(new 
FragInstanceDispatchResult(entry.getValue().getStatus()));
-        }
-      }
+  public boolean needRetry() {
+    // retried FI list is not empty and data region replica number is greater 
than 1
+    return !needRetryInstanceIndex.isEmpty()
+        && instances.get(0).getRegionReplicaSet().dataNodeLocations.size() > 1;
+  }
+
+  /**
+   * This function should be called after all last batch responses were 
received. This function will
+   * do the cleaning work and caller won't need to do the cleaning work 
outside this function. We
+   * will retry all failed FIs in last batch whose id were all saved in 
needRetryInstanceIds, if
+   * there are still failed FIs this time, they will be also saved in 
needRetryInstanceIds.
+   *
+   * <p>It's a sync function which means that once this function returned, the 
results of this retry
+   * have been received, and you don't need to call waitUntilCompleted.
+   */
+  public void retry() throws InterruptedException {
+    // 1. rebuild the batchRequests using remaining failed FIs, change the 
replica for each failed
+    // FI in this step
+    batchRequests.clear();
+    for (int fragmentInstanceIndex : needRetryInstanceIndex) {
+      this.batchRequests
+          .computeIfAbsent(
+              instances
+                  .get(fragmentInstanceIndex)
+                  .getNextRetriedHostDataNode()
+                  .getInternalEndPoint(),
+              x -> new BatchRequestWithIndex())
+          .addSinglePlanNodeReq(
+              fragmentInstanceIndex,
+              new TSendSinglePlanNodeReq(
+                  new TPlanNode(
+                      instances
+                          .get(fragmentInstanceIndex)
+                          .getFragment()
+                          .getPlanNodeTree()
+                          .serializeToByteBuffer()),
+                  
instances.get(fragmentInstanceIndex).getRegionReplicaSet().getRegionId()));
     }
-    return immediateFuture(new FragInstanceDispatchResult(true));
+
+    // 2. reset the pendingNumber, needRetryInstanceIds and startSendTime
+    needRetryInstanceIndex.clear();
+    pendingNumber.set(batchRequests.keySet().size());
+    startSendTime = System.nanoTime();
+
+    // 3. call sendAll() to retry
+    sendAll();
+
+    // 4. call waitUntilCompleted() to wait for the responses
+    waitUntilCompleted();
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
index 63af1ffedb9..ed7601b0b3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -20,22 +20,27 @@
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.commons.client.ThriftClient.isConnectionBroken;
+
 public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchPlanNodeResp> {
 
   private final List<Integer> instanceIds;
   private final AtomicLong pendingNumber;
   private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
+  private final List<Integer> needRetryInstanceIndex;
   private final long sendTime;
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
@@ -44,17 +49,23 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
       List<Integer> instanceIds,
       AtomicLong pendingNumber,
       Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap,
+      List<Integer> needRetryInstanceIndex,
       long sendTime) {
     this.instanceIds = instanceIds;
     this.pendingNumber = pendingNumber;
     this.instanceId2RespMap = instanceId2RespMap;
+    this.needRetryInstanceIndex = needRetryInstanceIndex;
     this.sendTime = sendTime;
   }
 
   @Override
   public void onComplete(TSendBatchPlanNodeResp sendBatchPlanNodeResp) {
     for (int i = 0; i < sendBatchPlanNodeResp.getResponses().size(); i++) {
-      instanceId2RespMap.put(instanceIds.get(i), 
sendBatchPlanNodeResp.getResponses().get(i));
+      TSendSinglePlanNodeResp singlePlanNodeResp = 
sendBatchPlanNodeResp.getResponses().get(i);
+      instanceId2RespMap.put(instanceIds.get(i), singlePlanNodeResp);
+      if (needRetry(singlePlanNodeResp)) {
+        needRetryInstanceIndex.add(instanceIds.get(i));
+      }
     }
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() 
- sendTime);
@@ -66,6 +77,9 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
 
   @Override
   public void onError(Exception e) {
+    if (needRetry(e)) {
+      needRetryInstanceIndex.addAll(instanceIds);
+    }
     TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp();
     String errorMsg = String.format("Fail to send plan node, exception 
message: %s", e);
     resp.setAccepted(false);
@@ -80,4 +94,15 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
       }
     }
   }
+
+  private boolean needRetry(Exception e) {
+    Throwable rootCause = ExceptionUtils.getRootCause(e);
+    // if the exception is SocketException and its error message is Broken 
pipe, it means that the
+    // remote node may go offline
+    return isConnectionBroken(rootCause);
+  }
+
+  private boolean needRetry(TSendSinglePlanNodeResp resp) {
+    return !resp.accepted && DispatchLogHandler.needRetry(resp.status.code);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 2e8929402d1..a1ab241fa6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -210,6 +212,25 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     // wait until remote dispatch done
     try {
       asyncPlanNodeSender.waitUntilCompleted();
+
+      if (asyncPlanNodeSender.needRetry()) {
+        // retry failed remote FIs
+        int retry = 0;
+        final int maxRetryTimes = 10;
+        long waitMillis = getRetrySleepTime(retry);
+
+        while (asyncPlanNodeSender.needRetry()) {
+          retry++;
+          asyncPlanNodeSender.retry();
+          if (!(asyncPlanNodeSender.needRetry() && retry < maxRetryTimes)) {
+            break;
+          }
+          // still need to retry, sleep some time before make another retry.
+          Thread.sleep(waitMillis);
+          waitMillis = getRetrySleepTime(retry);
+        }
+      }
+
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       logger.error("Interrupted when dispatching write async", e);
@@ -239,6 +260,12 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
+  private long getRetrySleepTime(int retryTimes) {
+    return Math.min(
+        (long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)),
+        TimeUnit.SECONDS.toMillis(20));
+  }
+
   private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
     TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
@@ -313,7 +340,10 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                   String.format("unknown read type [%s]", 
instance.getType())));
       }
     } catch (ClientManagerException | TException e) {
-      logger.warn("can't connect to node {}", endPoint, e);
+      logger.warn(
+          "can't connect to node {}, error msg is {}.",
+          endPoint,
+          ExceptionUtils.getRootCause(e).toString());
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
       status.setMessage("can't connect to node " + endPoint);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index e9d3b9c36ce..92d933397e6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -25,7 +25,9 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
 import java.net.SocketException;
 
 /**
@@ -88,8 +90,8 @@ public interface ThriftClient {
         if (o.printLogWhenEncounterException()) {
           logger.info(
               "Broken pipe error happened in sending RPC,"
-                  + " we need to clear all previous cached connection",
-              t);
+                  + " we need to clear all previous cached connection, error 
msg is {}",
+              rootCause.toString());
         }
         o.invalidateAll();
       }
@@ -105,6 +107,11 @@ public interface ThriftClient {
   static boolean isConnectionBroken(Throwable cause) {
     return (cause instanceof SocketException && 
cause.getMessage().contains("Broken pipe"))
         || (cause instanceof TTransportException
-            && cause.getMessage().contains("Socket is closed by peer"));
+            && (cause.getMessage().contains("Socket is closed by peer")
+                || cause.getMessage().contains("Read call frame size failed")))
+        || (cause instanceof IOException
+            && (cause.getMessage().contains("Connection reset by peer")
+                || cause.getMessage().contains("Broken pipe")))
+        || (cause instanceof ConnectException && 
cause.getMessage().contains("Connection refused"));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 6f3c00de2a5..36d273c7b78 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.async.TAsyncClientManager;
@@ -110,7 +111,10 @@ public class AsyncConfigNodeIServiceClient extends 
IConfigNodeRPCService.AsyncCl
       return true;
     } catch (Exception e) {
       if (printLogWhenEncounterException) {
-        logger.error("Unexpected exception occurs in {} : {}", this, 
e.getMessage());
+        logger.error(
+            "Unexpected exception occurs in {}, error msg is {}",
+            this,
+            ExceptionUtils.getRootCause(e).toString());
       }
       return false;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 15f6c662d9f..7b617788c3e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.async.TAsyncClientManager;
@@ -123,7 +124,10 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
       return true;
     } catch (Exception e) {
       if (printLogWhenEncounterException) {
-        logger.error("Unexpected exception occurs in {} : {}", this, 
e.getMessage());
+        logger.error(
+            "Unexpected exception occurs in {}, error msg is {}",
+            this,
+            ExceptionUtils.getRootCause(e).toString());
       }
       return false;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 97c5b1584de..6434ec7f017 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.async.TAsyncClientManager;
@@ -111,7 +112,10 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
       return true;
     } catch (Exception e) {
       if (printLogWhenEncounterException) {
-        logger.error("Unexpected exception occurs in {} : {}", this, 
e.getMessage());
+        logger.error(
+            "Unexpected exception occurs in {}, error msg is {}",
+            this,
+            ExceptionUtils.getRootCause(e).toString());
       }
       return false;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index fc45ab60398..83f47956812 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.async.TAsyncClientManager;
@@ -127,7 +128,10 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
       return true;
     } catch (Exception e) {
       if (printLogWhenEncounterException) {
-        LOGGER.error("Unexpected exception occurs in {} : {}", this, 
e.getMessage());
+        LOGGER.error(
+            "Unexpected exception occurs in {}, error msg is {}",
+            this,
+            ExceptionUtils.getRootCause(e).toString());
       }
       return false;
     }

Reply via email to