This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch caLastOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e037bc853b5a4497b804edc4bffa22c1a60b503e Author: JackieTien97 <[email protected]> AuthorDate: Tue May 16 20:17:06 2023 +0800 Add session interface --- .../java/org/apache/iotdb/isession/ISession.java | 3 ++ .../apache/iotdb/isession/pool/ISessionPool.java | 4 ++ .../java/org/apache/iotdb/session/Session.java | 31 ++++++++++++++ .../apache/iotdb/session/SessionConnection.java | 49 ++++++++++++++++++++++ .../org/apache/iotdb/session/pool/SessionPool.java | 24 +++++++++++ 5 files changed, 111 insertions(+) diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java index 4095afe368..2543512c1b 100644 --- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java +++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java @@ -165,6 +165,9 @@ public interface ISession extends AutoCloseable { SessionDataSet executeLastDataQuery(List<String> paths) throws StatementExecutionException, IoTDBConnectionException; + SessionDataSet executeLastDataQueryForOneDevice(String db, String device, List<String> sensors) + throws StatementExecutionException, IoTDBConnectionException; + SessionDataSet executeAggregationQuery(List<String> paths, List<TAggregationType> aggregations) throws StatementExecutionException, IoTDBConnectionException; diff --git a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java index 2f5c017540..8a54038999 100644 --- a/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java +++ b/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java @@ -421,6 +421,10 @@ public interface ISessionPool { SessionDataSetWrapper executeLastDataQuery(List<String> paths) throws StatementExecutionException, IoTDBConnectionException; + SessionDataSetWrapper executeLastDataQueryForOneDevice( + String db, String device, List<String> sensors) + throws StatementExecutionException, IoTDBConnectionException; + SessionDataSetWrapper executeAggregationQuery( List<String> paths, List<TAggregationType> aggregations) throws StatementExecutionException, IoTDBConnectionException; diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 1ed59f5d0d..1402b7040b 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -64,6 +64,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -861,6 +862,36 @@ public class Session implements ISession { return executeLastDataQuery(paths, time, queryTimeoutInMs); } + @Override + public SessionDataSet executeLastDataQueryForOneDevice( + String db, String device, List<String> sensors) + throws StatementExecutionException, IoTDBConnectionException { + Pair<SessionDataSet, TEndPoint> pair; + try { + pair = + getSessionConnection(device) + .executeLastDataQueryForOneDevice(db, device, sensors, queryTimeoutInMs); + if (pair.right != null) { + handleRedirection(device, pair.right); + } + return pair.left; + } catch (IoTDBConnectionException e) { + if (enableRedirection + && !deviceIdToEndpoint.isEmpty() + && deviceIdToEndpoint.get(device) != null) { + logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(device)); + deviceIdToEndpoint.remove(device); + + // reconnect with default connection + return defaultSessionConnection.executeLastDataQueryForOneDevice( + db, device, sensors, queryTimeoutInMs) + .left; + } else { + throw e; + } + } + } + @Override public SessionDataSet executeAggregationQuery( List<String> paths, List<TAggregationType> aggregations) diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 6cbd1dc6b2..dea5ebf7b9 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -44,6 +44,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; @@ -63,6 +64,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; import org.apache.iotdb.session.util.SessionUtils; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -451,6 +453,53 @@ public class SessionConnection { execResp.moreData); } + protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice( + String db, String device, List<String> sensors, long timeOut) + throws StatementExecutionException, IoTDBConnectionException { + TSFastLastDataQueryForOneDeviceReq req = + new TSFastLastDataQueryForOneDeviceReq(sessionId, db, device, sensors, statementId); + req.setFetchSize(session.fetchSize); + req.setEnableRedirectQuery(enableRedirect); + req.setLegalPathNodes(true); + req.setTimeout(timeOut); + TSExecuteStatementResp tsExecuteStatementResp = null; + TEndPoint redirectedEndPoint = null; + try { + tsExecuteStatementResp = client.executeFastLastDataQueryForOneDeviceV2(req); + RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus()); + } catch (RedirectException e) { + redirectedEndPoint = e.getEndPoint(); + } catch (TException e) { + if (reconnect()) { + try { + req.setSessionId(sessionId); + req.setStatementId(statementId); + tsExecuteStatementResp = client.executeFastLastDataQueryForOneDeviceV2(req); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(logForReconnectionFailure()); + } + } + + RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus()); + return new Pair<>( + new SessionDataSet( + "", + tsExecuteStatementResp.getColumns(), + tsExecuteStatementResp.getDataTypeList(), + tsExecuteStatementResp.columnNameIndexMap, + tsExecuteStatementResp.getQueryId(), + statementId, + client, + sessionId, + tsExecuteStatementResp.queryResult, + tsExecuteStatementResp.isIgnoreTimeStamp(), + tsExecuteStatementResp.moreData), + redirectedEndPoint); + } + protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut) throws StatementExecutionException, IoTDBConnectionException, RedirectException { TSLastDataQueryReq tsLastDataQueryReq = diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 369f723f2d..110165ca33 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -2584,6 +2584,30 @@ public class SessionPool implements ISessionPool { return null; } + @Override + public SessionDataSetWrapper executeLastDataQueryForOneDevice( + String db, String device, List<String> sensors) + throws StatementExecutionException, IoTDBConnectionException { + for (int i = 0; i < RETRY; i++) { + ISession session = getSession(); + try { + SessionDataSet resp = session.executeLastDataQueryForOneDevice(db, device, sensors); + SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this); + occupy(session); + return wrapper; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("executeLastDataQuery failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + // never go here + return null; + } + @Override public SessionDataSetWrapper executeAggregationQuery( List<String> paths, List<TAggregationType> aggregations)
