This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 9d2a98b70f2 [to dev/1.3] [RTO/RPO] Unify retry logic on 
SessionConnection (#14927)
9d2a98b70f2 is described below

commit 9d2a98b70f238c4a4151eef237484374bff80592
Author: William Song <[email protected]>
AuthorDate: Mon Feb 24 10:19:08 2025 +0800

    [to dev/1.3] [RTO/RPO] Unify retry logic on SessionConnection (#14927)
    
    * cherry-pick merge
    
    * add back load analyze exp
    
    * add back load analyze exp
---
 .../apache/iotdb/session/SessionConnection.java    | 1195 ++++++--------------
 1 file changed, 331 insertions(+), 864 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 4a8c3a36ce5..425ae908e43 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -71,6 +71,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -262,23 +263,14 @@ public class SessionConnection {
 
   protected void setTimeZone(String zoneId)
       throws StatementExecutionException, IoTDBConnectionException {
-    TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
-    TSStatus resp;
-    try {
-      resp = client.setTimeZone(req);
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          req.setSessionId(sessionId);
-          resp = client.setTimeZone(req);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
-    RpcUtils.verifySuccess(resp);
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, 
zoneId);
+                  return client.setTimeZone(req);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
     setTimeZoneOfSession(zoneId);
   }
 
@@ -295,93 +287,52 @@ public class SessionConnection {
 
   protected void setStorageGroup(String storageGroup)
       throws IoTDBConnectionException, StatementExecutionException {
-    try {
-      RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroup));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroup));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(() -> client.setStorageGroup(sessionId, 
storageGroup)).getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void deleteStorageGroups(List<String> storageGroups)
       throws IoTDBConnectionException, StatementExecutionException {
-    try {
-      RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroups));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroups));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(() -> client.deleteStorageGroups(sessionId, 
storageGroups)).getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createTimeseries(TSCreateTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.createTimeseries(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createTimeseries(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createTimeseries(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createAlignedTimeseries(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.createMultiTimeseries(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createMultiTimeseries(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createMultiTimeseries(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected boolean checkTimeseriesExists(String path, long timeout)
@@ -406,26 +357,22 @@ public class SessionConnection {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
     execReq.setFetchSize(session.fetchSize);
     execReq.setTimeout(timeout);
-    TSExecuteStatementResp execResp;
-    try {
-      execReq.setEnableRedirectQuery(enableRedirect);
-      execResp = client.executeQueryStatementV2(execReq);
+    execReq.setEnableRedirectQuery(enableRedirect);
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              execReq.setSessionId(sessionId);
+              execReq.setStatementId(statementId);
+              return client.executeQueryStatementV2(execReq);
+            });
+    TSExecuteStatementResp execResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
       RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          execReq.setSessionId(sessionId);
-          execReq.setStatementId(statementId);
-          execResp = client.executeQueryStatementV2(execReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(execResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(execResp.getStatus());
     return new SessionDataSet(
         sql,
         execResp.getColumns(),
@@ -445,49 +392,8 @@ public class SessionConnection {
 
   protected void executeNonQueryStatement(String sql)
       throws IoTDBConnectionException, StatementExecutionException {
-
     TSExecuteStatementReq request = new TSExecuteStatementReq(sessionId, sql, 
statementId);
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = executeNonQueryStatementInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        RpcUtils.verifySuccess(status);
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerify(() -> executeNonQueryStatementInternal(request));
   }
 
   private TSStatus executeNonQueryStatementInternal(TSExecuteStatementReq 
request)
@@ -504,26 +410,23 @@ public class SessionConnection {
         new TSRawDataQueryReq(sessionId, paths, startTime, endTime, 
statementId);
     execReq.setFetchSize(session.fetchSize);
     execReq.setTimeout(timeOut);
-    TSExecuteStatementResp execResp;
-    try {
-      execReq.setEnableRedirectQuery(enableRedirect);
-      execResp = client.executeRawDataQueryV2(execReq);
+    execReq.setEnableRedirectQuery(enableRedirect);
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              execReq.setSessionId(sessionId);
+              execReq.setStatementId(statementId);
+              return client.executeRawDataQueryV2(execReq);
+            });
+
+    TSExecuteStatementResp execResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
       RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          execReq.setSessionId(sessionId);
-          execReq.setStatementId(statementId);
-          execResp = client.executeRawDataQueryV2(execReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(execResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(execResp.getStatus());
     return new SessionDataSet(
         "",
         execResp.getColumns(),
@@ -548,28 +451,27 @@ public class SessionConnection {
     req.setEnableRedirectQuery(enableRedirect);
     req.setLegalPathNodes(isLegalPathNodes);
     req.setTimeout(timeOut);
-    TSExecuteStatementResp tsExecuteStatementResp = null;
     TEndPoint redirectedEndPoint = null;
-    try {
-      tsExecuteStatementResp = 
client.executeFastLastDataQueryForOneDeviceV2(req);
-      
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
-    } catch (RedirectException e) {
-      redirectedEndPoint = e.getEndPoint();
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          req.setSessionId(sessionId);
-          req.setStatementId(statementId);
-          tsExecuteStatementResp = 
client.executeFastLastDataQueryForOneDeviceV2(req);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              req.setSessionId(sessionId);
+              req.setStatementId(statementId);
+              return client.executeFastLastDataQueryForOneDeviceV2(req);
+            });
+
+    TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
+      try {
+        
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+      } catch (RedirectException e) {
+        redirectedEndPoint = e.getEndPoint();
       }
+    } else {
+      RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     return new Pair<>(
         new SessionDataSet(
             "",
@@ -594,25 +496,22 @@ public class SessionConnection {
     tsLastDataQueryReq.setFetchSize(session.fetchSize);
     tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
     tsLastDataQueryReq.setTimeout(timeOut);
-    TSExecuteStatementResp tsExecuteStatementResp;
-    try {
-      tsExecuteStatementResp = 
client.executeLastDataQueryV2(tsLastDataQueryReq);
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              tsLastDataQueryReq.setSessionId(sessionId);
+              tsLastDataQueryReq.setStatementId(statementId);
+              return client.executeLastDataQueryV2(tsLastDataQueryReq);
+            });
+    final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+
+    if (result.getRetryAttempts() == 0) {
       
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          tsLastDataQueryReq.setSessionId(sessionId);
-          tsLastDataQueryReq.setStatementId(statementId);
-          tsExecuteStatementResp = 
client.executeLastDataQueryV2(tsLastDataQueryReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     return new SessionDataSet(
         "",
         tsExecuteStatementResp.getColumns(),
@@ -676,25 +575,21 @@ public class SessionConnection {
 
   private SessionDataSet executeAggregationQuery(TSAggregationQueryReq 
tsAggregationQueryReq)
       throws StatementExecutionException, IoTDBConnectionException, 
RedirectException {
-    TSExecuteStatementResp tsExecuteStatementResp;
-    try {
-      tsExecuteStatementResp = 
client.executeAggregationQueryV2(tsAggregationQueryReq);
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              tsAggregationQueryReq.setSessionId(sessionId);
+              tsAggregationQueryReq.setStatementId(statementId);
+              return client.executeAggregationQueryV2(tsAggregationQueryReq);
+            });
+
+    TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
       
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          tsAggregationQueryReq.setSessionId(sessionId);
-          tsAggregationQueryReq.setStatementId(statementId);
-          tsExecuteStatementResp = 
client.executeAggregationQuery(tsAggregationQueryReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     return new SessionDataSet(
         "",
         tsExecuteStatementResp.getColumns(),
@@ -721,52 +616,7 @@ public class SessionConnection {
 
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
   }
 
   private TSStatus insertRecordInternal(TSInsertRecordReq request) throws 
TException {
@@ -776,52 +626,7 @@ public class SessionConnection {
 
   protected void insertRecord(TSInsertStringRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
   }
 
   private TSStatus insertRecordInternal(TSInsertStringRecordReq request) 
throws TException {
@@ -831,52 +636,8 @@ public class SessionConnection {
 
   protected void insertRecords(TSInsertRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordsInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+        () -> insertRecordsInternal(request), request::getPrefixPaths);
   }
 
   private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws 
TException {
@@ -886,53 +647,8 @@ public class SessionConnection {
 
   protected void insertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordsInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+        () -> insertRecordsInternal(request), request::getPrefixPaths);
   }
 
   private TSStatus insertRecordsInternal(TSInsertStringRecordsReq request) 
throws TException {
@@ -942,53 +658,7 @@ public class SessionConnection {
 
   protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq 
request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordsOfOneDeviceInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> 
insertRecordsOfOneDeviceInternal(request));
   }
 
   private TSStatus 
insertRecordsOfOneDeviceInternal(TSInsertRecordsOfOneDeviceReq request)
@@ -999,53 +669,7 @@ public class SessionConnection {
 
   protected void 
insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertStringRecordsOfOneDeviceInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> 
insertStringRecordsOfOneDeviceInternal(request));
   }
 
   private TSStatus insertStringRecordsOfOneDeviceInternal(
@@ -1054,57 +678,48 @@ public class SessionConnection {
     return client.insertStringRecordsOfOneDevice(request);
   }
 
-  protected void insertTablet(TSInsertTabletReq request)
-      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+  private void callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+      TFunction<TSStatus> function, Supplier<List<String>> pathSupplier)
+      throws StatementExecutionException, RedirectException, 
IoTDBConnectionException {
+    RetryResult<TSStatus> result = callWithRetry(function);
 
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertTabletInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
+    TSStatus status = result.getResult();
+    if (status != null) {
+      if (result.getRetryAttempts() == 0) {
+        RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
pathSupplier.get());
+      } else {
+        RpcUtils.verifySuccess(status);
       }
+    } else if (result.getException() != null) {
+      throw new IoTDBConnectionException(result.getException());
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
     }
+  }
 
+  private void callWithRetryAndVerifyWithRedirection(TFunction<TSStatus> 
function)
+      throws StatementExecutionException, RedirectException, 
IoTDBConnectionException {
+    RetryResult<TSStatus> result = callWithRetry(function);
+
+    TSStatus status = result.getResult();
     if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
+      if (result.getRetryAttempts() == 0) {
+        RpcUtils.verifySuccessWithRedirection(status);
+      } else {
+        RpcUtils.verifySuccess(status);
+      }
+    } else if (result.getException() != null) {
+      throw new IoTDBConnectionException(result.getException());
     } else {
       throw new IoTDBConnectionException(logForReconnectionFailure());
     }
   }
 
+  protected void insertTablet(TSInsertTabletReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    callWithRetryAndVerifyWithRedirection(() -> insertTabletInternal(request));
+  }
+
   private TSStatus insertTabletInternal(TSInsertTabletReq request) throws 
TException {
     request.setSessionId(sessionId);
     return client.insertTablet(request);
@@ -1112,53 +727,8 @@ public class SessionConnection {
 
   protected void insertTablets(TSInsertTabletsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertTabletsInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+        () -> insertTabletsInternal(request), request::getPrefixPaths);
   }
 
   private TSStatus insertTabletsInternal(TSInsertTabletsReq request) throws 
TException {
@@ -1168,55 +738,31 @@ public class SessionConnection {
 
   protected void deleteTimeseries(List<String> paths)
       throws IoTDBConnectionException, StatementExecutionException {
+    callWithRetryAndVerify(() -> client.deleteTimeseries(sessionId, paths));
+  }
 
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = client.deleteTimeseries(sessionId, paths);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        RpcUtils.verifySuccess(status);
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
+  public void deleteData(TSDeleteDataReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    callWithRetryAndVerify(() -> deleteDataInternal(request));
+  }
 
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
+  private void callWithRetryAndVerify(TFunction<TSStatus> rpc)
+      throws IoTDBConnectionException, StatementExecutionException {
+    RetryResult<TSStatus> result = callWithRetry(rpc);
+    if (result.getResult() != null) {
+      RpcUtils.verifySuccess(result.getResult());
+    } else if (result.getException() != null) {
+      throw new IoTDBConnectionException(result.getException());
     } else {
       throw new IoTDBConnectionException(logForReconnectionFailure());
     }
   }
 
-  public void deleteData(TSDeleteDataReq request)
-      throws IoTDBConnectionException, StatementExecutionException {
-
+  private RetryResult<TSStatus> callWithRetry(TFunction<TSStatus> rpc) {
     TException lastTException = null;
     TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
+    int i;
+    for (i = 0; i <= maxRetryCount; i++) {
       if (i > 0) {
         // re-init the TException and TSStatus
         lastTException = null;
@@ -1225,7 +771,13 @@ public class SessionConnection {
         try {
           TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
         } catch (InterruptedException e) {
-          // just ignore
+          Thread.currentThread().interrupt();
+          logger.warn(
+              "Thread {} was interrupted during retry {} with wait time {} ms. 
Exiting retry loop.",
+              Thread.currentThread().getName(),
+              i,
+              retryIntervalInMs);
+          break;
         }
         if (!reconnect()) {
           // reconnect failed, just continue to make another retry.
@@ -1233,27 +785,19 @@ public class SessionConnection {
         }
       }
       try {
-        status = deleteDataInternal(request);
+        status = rpc.run();
         // need retry
         if (status.isSetNeedRetry() && status.isNeedRetry()) {
           continue;
         }
-        // succeed or don't need to retry
-        RpcUtils.verifySuccess(status);
-        return;
+        break;
       } catch (TException e) {
         // all network exception need retry until reaching maxRetryCount
         lastTException = e;
       }
     }
 
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    return new RetryResult<>(status, lastTException, i);
   }
 
   private TSStatus deleteDataInternal(TSDeleteDataReq request) throws 
TException {
@@ -1263,116 +807,74 @@ public class SessionConnection {
 
   protected void testInsertRecord(TSInsertStringRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.testInsertStringRecord(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertStringRecord(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertStringRecord(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void testInsertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.testInsertRecord(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertRecord(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertRecord(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   public void testInsertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.testInsertStringRecords(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertStringRecords(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertStringRecords(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   public void testInsertRecords(TSInsertRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.testInsertRecords(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertRecords(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertRecords(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void testInsertTablet(TSInsertTabletReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.testInsertTablet(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertTablet(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertTablet(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void testInsertTablets(TSInsertTabletsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.testInsertTablets(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertTablets(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertTablets(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   @SuppressWarnings({
@@ -1425,189 +927,121 @@ public class SessionConnection {
 
   protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.createSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.appendSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.pruneSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
       throws StatementExecutionException, IoTDBConnectionException {
-    TSQueryTemplateResp execResp;
-    req.setSessionId(sessionId);
-    try {
-      execResp = client.querySchemaTemplate(req);
-      RpcUtils.verifySuccess(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          execResp = client.querySchemaTemplate(req);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
-
+    final TSQueryTemplateResp execResp =
+        callWithReconnect(
+                () -> {
+                  req.setSessionId(sessionId);
+                  return client.querySchemaTemplate(req);
+                })
+            .getResult();
     RpcUtils.verifySuccess(execResp.getStatus());
     return execResp;
   }
 
   protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.setSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.setSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.setSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.unsetSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.dropSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createTimeseriesUsingSchemaTemplate(
       TCreateTimeseriesUsingSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          request.setSessionId(sessionId);
-          
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
-      }
-    }
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createTimeseriesUsingSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected TSBackupConfigurationResp getBackupConfiguration()
       throws IoTDBConnectionException, StatementExecutionException {
-    TSBackupConfigurationResp execResp;
-    try {
-      execResp = client.getBackupConfiguration();
-      RpcUtils.verifySuccess(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          execResp = client.getBackupConfiguration();
-          RpcUtils.verifySuccess(execResp.getStatus());
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    final TSBackupConfigurationResp execResp =
+        callWithReconnect(() -> client.getBackupConfiguration()).getResult();
+    RpcUtils.verifySuccess(execResp.getStatus());
     return execResp;
   }
 
-  public TSConnectionInfoResp fetchAllConnections() throws 
IoTDBConnectionException {
+  private <T> RetryResult<T> callWithReconnect(TFunction<T> supplier)
+      throws IoTDBConnectionException {
+    T ret;
     try {
-      return client.fetchAllConnectionsInfo();
+      ret = supplier.run();
+      return new RetryResult<>(ret, null, 0);
     } catch (TException e) {
       if (reconnect()) {
         try {
-          return client.fetchAllConnectionsInfo();
+          ret = supplier.run();
+          return new RetryResult<>(ret, null, 1);
         } catch (TException tException) {
           throw new IoTDBConnectionException(tException);
         }
@@ -1617,6 +1051,10 @@ public class SessionConnection {
     }
   }
 
+  public TSConnectionInfoResp fetchAllConnections() throws 
IoTDBConnectionException {
+    return callWithReconnect(() -> 
client.fetchAllConnectionsInfo()).getResult();
+  }
+
   public boolean isEnableRedirect() {
     return enableRedirect;
   }
@@ -1652,4 +1090,33 @@ public class SessionConnection {
   public String toString() {
     return "SessionConnection{" + " endPoint=" + endPoint + "}";
   }
+
+  private interface TFunction<T> {
+    T run() throws TException;
+  }
+
+  private static class RetryResult<T> {
+    private final T result;
+    private final TException exception;
+    private final int retryAttempts;
+
+    public RetryResult(T result, TException exception, int retryAttempts) {
+      Preconditions.checkArgument(result == null || exception == null);
+      this.result = result;
+      this.exception = exception;
+      this.retryAttempts = retryAttempts;
+    }
+
+    public int getRetryAttempts() {
+      return retryAttempts;
+    }
+
+    public TException getException() {
+      return exception;
+    }
+
+    public T getResult() {
+      return result;
+    }
+  }
 }

Reply via email to