This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 38cc84b784d2e458091d7e0798be3aff82d42fe6 Author: Minghui Liu <[email protected]> AuthorDate: Mon Nov 14 10:07:34 2022 +0800 change interface name --- .../main/java/org/apache/iotdb/SessionExample.java | 5 ++-- .../iotdb/session/it/IoTDBFetchWindowSetIT.java | 2 +- .../db/mpp/plan/parser/StatementGenerator.java | 4 ++-- .../service/thrift/impl/ClientRPCServiceImpl.java | 27 ++++++++++------------ .../db/service/thrift/impl/TSServiceImpl.java | 6 ++--- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 2 +- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 10 ++++---- .../java/org/apache/iotdb/session/ISession.java | 2 +- .../java/org/apache/iotdb/session/Session.java | 4 ++-- .../apache/iotdb/session/SessionConnection.java | 24 +++++++++++-------- thrift/src/main/thrift/client.thrift | 8 +++---- 11 files changed, 48 insertions(+), 46 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index fbe0c63c8a..8760400c91 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -74,8 +74,9 @@ public class SessionExample { List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1"); List<Integer> indexes = Arrays.asList(1, 3, 5, 7); - List<SessionDataSet> windowSet = session.fetchWindowSet(queryPaths, null, 1, 40, 2, 2, indexes); - for (SessionDataSet window : windowSet) { + List<SessionDataSet> windowBatch = + session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes); + for (SessionDataSet window : windowBatch) { System.out.println(window.getColumnNames()); while (window.hasNext()) { System.out.println(window.next()); diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java index 1e31bd5c83..5c56924b03 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBFetchWindowSetIT.java @@ -172,7 +172,7 @@ public class IoTDBFetchWindowSetIT { List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1"); List<Integer> indexes = Arrays.asList(1, 3, 5, 7); List<SessionDataSet> windowSet = - session.fetchWindowSet(queryPaths, null, 0, 20, 2, 2, indexes); + session.fetchWindowBatch(queryPaths, null, 0, 20, 2, 2, indexes); Assert.assertEquals(indexes.size(), windowSet.size()); for (SessionDataSet window : windowSet) { while (window.hasNext()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java index dadd1b3466..d0b5f5167b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java @@ -70,7 +70,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; @@ -179,7 +179,7 @@ public class StatementGenerator { return lastQueryStatement; } - public static Statement createStatement(TSFetchWindowSetReq fetchWindowSetReq, ZoneId zoneId) + public static Statement createStatement(TSFetchWindowBatchReq fetchWindowSetReq) throws IllegalPathException { FetchWindowSetStatement statement = new FetchWindowSetStatement(); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 1d7d875999..61363a1d6c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -88,8 +88,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp; import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; @@ -132,7 +132,6 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.AUDIT_LOGGER; import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG; import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION; import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER; -import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER; import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; @@ -421,19 +420,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } @Override - public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException { + public TSFetchWindowBatchResp fetchWindowBatch(TSFetchWindowBatchReq req) throws TException { if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { - return RpcUtils.getTSFetchWindowSetResp(getNotLoggedInStatus()); + return RpcUtils.getTSFetchWindowBatchResp(getNotLoggedInStatus()); } long startTime = System.currentTimeMillis(); try { - Statement s = - StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId()); + Statement s = StatementGenerator.createStatement(req); // permission check TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession()); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return RpcUtils.getTSFetchWindowSetResp(status); + return RpcUtils.getTSFetchWindowBatchResp(status); } QUERY_FREQUENCY_RECORDER.incrementAndGet(); @@ -458,14 +456,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { - TSFetchWindowSetResp resp = - createTSFetchWindowSetResp(queryExecution.getDatasetHeader(), queryId); - resp.setQueryResultList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution)); + TSFetchWindowBatchResp resp = + createTSFetchWindowBatchResp(queryExecution.getDatasetHeader()); + resp.setWindowBatch(QueryDataSetUtils.convertTsBlocksToWindowBatch(queryExecution)); return resp; } } catch (Exception e) { // TODO call the coordinator to release query resource - return RpcUtils.getTSFetchWindowSetResp( + return RpcUtils.getTSFetchWindowBatchResp( onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); } finally { addOperationLatency(Operation.EXECUTE_QUERY, startTime); @@ -1798,12 +1796,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } - private TSFetchWindowSetResp createTSFetchWindowSetResp(DatasetHeader header, long queryId) { - TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS); + private TSFetchWindowBatchResp createTSFetchWindowBatchResp(DatasetHeader header) { + TSFetchWindowBatchResp resp = RpcUtils.getTSFetchWindowBatchResp(TSStatusCode.SUCCESS_STATUS); resp.setColumnNameList(header.getRespColumns()); resp.setColumnTypeList(header.getRespDataTypeList()); resp.setColumnNameIndexMap(header.getColumnNameIndexMap()); - resp.setQueryId(queryId); return resp; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 9dd837b131..7470898fac 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -103,8 +103,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp; import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; @@ -279,7 +279,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler { } @Override - public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException { + public TSFetchWindowBatchResp fetchWindowBatch(TSFetchWindowBatchReq req) throws TException { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 9ac3cbd404..d053e4b953 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -400,7 +400,7 @@ public class QueryDataSetUtils { return res; } - public static List<List<ByteBuffer>> convertTsBlocksToWindowSet(IQueryExecution queryExecution) + public static List<List<ByteBuffer>> convertTsBlocksToWindowBatch(IQueryExecution queryExecution) throws IoTDBException { List<List<ByteBuffer>> windowSet = new ArrayList<>(); diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index cbedcf75e2..01668c4e24 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -25,7 +25,7 @@ import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp; import java.lang.reflect.Proxy; import java.text.SimpleDateFormat; @@ -226,16 +226,16 @@ public class RpcUtils { return resp; } - public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatus status) { - TSFetchWindowSetResp resp = new TSFetchWindowSetResp(); + public static TSFetchWindowBatchResp getTSFetchWindowBatchResp(TSStatus status) { + TSFetchWindowBatchResp resp = new TSFetchWindowBatchResp(); TSStatus tsStatus = new TSStatus(status); resp.setStatus(tsStatus); return resp; } - public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatusCode tsStatusCode) { + public static TSFetchWindowBatchResp getTSFetchWindowBatchResp(TSStatusCode tsStatusCode) { TSStatus status = getStatus(tsStatusCode); - return getTSFetchWindowSetResp(status); + return getTSFetchWindowBatchResp(status); } public static final String DEFAULT_TIME_FORMAT = "default"; diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java index 1b140e0d9f..4e8b42a864 100644 --- a/session/src/main/java/org/apache/iotdb/session/ISession.java +++ b/session/src/main/java/org/apache/iotdb/session/ISession.java @@ -441,7 +441,7 @@ public interface ISession extends AutoCloseable { TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException; - List<SessionDataSet> fetchWindowSet( + List<SessionDataSet> fetchWindowBatch( List<String> queryPaths, String functionName, long startTime, diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 68e35b367b..5523f9ea73 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -3264,7 +3264,7 @@ public class Session implements ISession { } @Override - public List<SessionDataSet> fetchWindowSet( + public List<SessionDataSet> fetchWindowBatch( List<String> queryPaths, String functionName, long startTime, @@ -3273,7 +3273,7 @@ public class Session implements ISession { long slidingStep, List<Integer> indexes) throws StatementExecutionException { - return defaultSessionConnection.fetchWindowSet( + return defaultSessionConnection.fetchWindowBatch( queryPaths, functionName, startTime, endTime, interval, slidingStep, indexes); } diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 6966b89ae8..1cfa1666c3 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -39,8 +39,8 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowBatchResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; @@ -485,7 +485,7 @@ public class SessionConnection { tsExecuteStatementResp.isIgnoreTimeStamp()); } - public List<SessionDataSet> fetchWindowSet( + public List<SessionDataSet> fetchWindowBatch( List<String> queryPaths, String functionName, long startTime, @@ -494,31 +494,35 @@ public class SessionConnection { long slidingStep, List<Integer> indexes) throws StatementExecutionException { - TSFetchWindowSetReq req = - new TSFetchWindowSetReq( + TSFetchWindowBatchReq req = + new TSFetchWindowBatchReq( sessionId, statementId, queryPaths, new TGroupByTimeParameter(startTime, endTime, interval, slidingStep, indexes)); - TSFetchWindowSetResp resp; + if (functionName != null) { + req.setFunctionName(functionName); + } + + TSFetchWindowBatchResp resp; try { - resp = client.fetchWindowSet(req); + resp = client.fetchWindowBatch(req); RpcUtils.verifySuccess(resp.getStatus()); } catch (TException e) { throw new StatementExecutionException(""); } List<SessionDataSet> windowSet = new ArrayList<>(); - for (List<ByteBuffer> queryResult : resp.getQueryResultList()) { + for (List<ByteBuffer> window : resp.getWindowBatch()) { SessionDataSet sessionDataSet = new SessionDataSet( resp.columnNameList, resp.columnTypeList, resp.columnNameIndexMap, - resp.queryId, + -1, statementId, sessionId, - queryResult); + window); windowSet.add(sessionDataSet); } return windowSet; diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift index 9474d0645b..6d5f6cfc11 100644 --- a/thrift/src/main/thrift/client.thrift +++ b/thrift/src/main/thrift/client.thrift @@ -421,7 +421,7 @@ struct TGroupByTimeParameter { 5: required list<i32> indexes } -struct TSFetchWindowSetReq { +struct TSFetchWindowBatchReq { 1: required i64 sessionId 2: required i64 statementId 3: required list<string> queryPaths @@ -429,12 +429,12 @@ struct TSFetchWindowSetReq { 5: required TGroupByTimeParameter groupByTimeParameter } -struct TSFetchWindowSetResp { +struct TSFetchWindowBatchResp { 1: required common.TSStatus status 2: required list<string> columnNameList 3: required list<string> columnTypeList 4: required map<string, i32> columnNameIndexMap - 5: required list<TSQueryDataSet> windowSet + 5: required list<list<binary>> windowBatch } // The sender and receiver need to check some info to confirm validity @@ -585,5 +585,5 @@ service IClientRPCService { TSConnectionInfoResp fetchAllConnectionsInfo(); - TSFetchWindowSetResp fetchWindowSet(1:TSFetchWindowSetReq req); + TSFetchWindowBatchResp fetchWindowBatch(1:TSFetchWindowBatchReq req); } \ No newline at end of file
