This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch SessionRetry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11b9ca862f4ffa1d99591c200e4295fb787ec2e2 Author: JackieTien97 <[email protected]> AuthorDate: Mon Oct 12 18:22:46 2020 +0800 add session retry --- .../main/java/org/apache/iotdb/session/Config.java | 3 +- .../java/org/apache/iotdb/session/Session.java | 96 +++++++++++++++++++--- 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java index 325ae26..067b895 100644 --- a/session/src/main/java/org/apache/iotdb/session/Config.java +++ b/session/src/main/java/org/apache/iotdb/session/Config.java @@ -24,5 +24,6 @@ public class Config { public static final String DEFAULT_PASSWORD = "password"; public static final int DEFAULT_FETCH_SIZE = 10000; public static final int DEFAULT_TIMEOUT_MS = 0; - + public static final int RETRY_NUM = 3; + public static final long RETRY_INTERVAL = 1000; } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 4fc2404..17c38da 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -29,9 +29,6 @@ import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; -import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; -import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq; import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; @@ -40,12 +37,16 @@ import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; import org.apache.iotdb.service.rpc.thrift.TSIService; -import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq; import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq; import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -66,7 +67,6 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; public class Session { @@ -83,6 +83,8 @@ public class Session { private ZoneId zoneId; private long statementId; private int fetchSize; + private boolean enableRPCCompression; + private int connectionTimeoutInMs; public Session(String host, int rpcPort) { this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD); @@ -122,6 +124,9 @@ public class Session { return; } + this.enableRPCCompression = enableRPCCompression; + this.connectionTimeoutInMs = connectionTimeoutInMs; + transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs)); if (!transport.isOpen()) { @@ -239,7 +244,15 @@ public class Session { try { RpcUtils.verifySuccess(client.insertTablet(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertTablet(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server. Please check server status"); + } } } @@ -293,7 +306,15 @@ public class Session { try { RpcUtils.verifySuccess(client.insertTablets(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertTablets(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server. Please check server status"); + } } } @@ -345,7 +366,15 @@ public class Session { try { RpcUtils.verifySuccess(client.insertRecords(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertRecords(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server. Please check server status"); + } } } @@ -393,7 +422,15 @@ public class Session { try { RpcUtils.verifySuccess(client.insertStringRecords(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertStringRecords(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server. Please check server status"); + } } } @@ -430,7 +467,15 @@ public class Session { try { RpcUtils.verifySuccess(client.insertRecord(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertRecord(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server. Please check server status"); + } } } @@ -465,7 +510,15 @@ public class Session { try { RpcUtils.verifySuccess(client.insertStringRecord(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertStringRecord(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server. Please check server status"); + } } } @@ -1057,4 +1110,25 @@ public class Session { } } + private boolean reconnect() { + boolean flag = false; + for (int i = 1; i <= Config.RETRY_NUM; i++) { + try { + if (transport != null) { + close(); + open(enableRPCCompression, connectionTimeoutInMs); + flag = true; + } + } catch (Exception e) { + try { + Thread.sleep(Config.RETRY_INTERVAL); + } catch (InterruptedException e1) { + logger.error("reconnect is interrupted.", e1); + Thread.currentThread().interrupt(); + } + } + } + return flag; + } + }
