This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch c-ger-p in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eeb7dce5d1c1a1c7d8008c8dcbc2d9507435f485 Author: Caideyipi <[email protected]> AuthorDate: Thu Apr 16 09:54:47 2026 +0800 part --- .../protocol/rest/v2/impl/RestApiServiceImpl.java | 1 - .../protocol/thrift/impl/ClientRPCServiceImpl.java | 1288 ++++++++++---------- 2 files changed, 629 insertions(+), 660 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java index 23aca924b7c..594a1d794a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java @@ -35,7 +35,6 @@ import org.apache.iotdb.db.protocol.rest.v2.handler.RequestValidationHandler; import org.apache.iotdb.db.protocol.rest.v2.handler.StatementConstructionHandler; import org.apache.iotdb.db.protocol.rest.v2.model.InsertRecordsRequest; import org.apache.iotdb.db.protocol.rest.v2.model.InsertTabletRequest; -import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList; import org.apache.iotdb.db.protocol.rest.v2.model.QueryDataSet; import org.apache.iotdb.db.protocol.rest.v2.model.SQL; import org.apache.iotdb.db.protocol.session.IClientSession; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index fef00a36a41..59b19f9a6cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -37,7 +37,6 @@ import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.audit.AuditLogger; import org.apache.iotdb.db.auth.AuthorityChecker; @@ -88,7 +87,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement; @@ -218,7 +216,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private static final Logger SAMPLED_QUERIES_LOGGER = - LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); + LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); private static final Coordinator COORDINATOR = Coordinator.getInstance(); @@ -227,7 +225,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public static final String ERROR_CODE = "error code: "; private static final TSProtocolVersion CURRENT_RPC_VERSION = - TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; private static final boolean ENABLE_AUDIT_LOG = config.isEnableAuditLog(); @@ -242,31 +240,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS); private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); private static final String NO_QUERY_EXECUTION_ERR_MSG = - "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; + "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) - throws IoTDBException, IOException; + throws IoTDBException, IOException; } private static final SelectResult SELECT_RESULT = - (resp, queryExecution, fetchSize) -> { - Pair<List<ByteBuffer>, Boolean> pair = - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize); - resp.setQueryResult(pair.left); - return pair.right; - }; + (resp, queryExecution, fetchSize) -> { + Pair<List<ByteBuffer>, Boolean> pair = + QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize); + resp.setQueryResult(pair.left); + return pair.right; + }; private static final SelectResult OLD_SELECT_RESULT = - (resp, queryExecution, fetchSize) -> { - Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize); - resp.setQueryDataSet(pair.left); - return pair.right; - }; + (resp, queryExecution, fetchSize) -> { + Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize); + resp.setQueryDataSet(pair.left); + return pair.right; + }; public ClientRPCServiceImpl() { partitionFetcher = ClusterPartitionFetcher.getInstance(); @@ -274,7 +272,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeStatementInternal( - TSExecuteStatementReq req, SelectResult setResult) { + TSExecuteStatementReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; String statement = req.getStatement(); @@ -293,8 +291,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (s == null) { return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus( - TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); } // permission check TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); @@ -303,8 +301,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); statementType = s.getType(); if (ENABLE_AUDIT_LOG) { AuditLogger.log(statement, s); @@ -316,30 +314,30 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { ExecutionResult result; if (s.shouldSplit()) { result = - executeBatchStatement( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - true); + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + true); } else { result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { finished = true; return RpcUtils.getTSExecuteStatementResp(result.status); } @@ -364,7 +362,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); + onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); } catch (Error error) { finished = true; t = error; @@ -376,14 +374,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost if (statementType != null) { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), currentOperationCost); + OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), currentOperationCost); } if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - statementType, executionTime > 0 ? executionTime : currentOperationCost); + statementType, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -394,7 +392,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeRawDataQueryInternal( - TSRawDataQueryReq req, SelectResult setResult) { + TSRawDataQueryReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -414,8 +412,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("execute Raw Data Query: %s", req), s); @@ -423,15 +421,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -457,7 +455,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -468,13 +466,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } @@ -486,7 +484,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeLastDataQueryInternal( - TSLastDataQueryReq req, SelectResult setResult) { + TSLastDataQueryReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -505,8 +503,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("Last Data Query: %s", req), s); @@ -514,15 +512,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -549,7 +547,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -561,13 +559,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } @@ -579,7 +577,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeAggregationQueryInternal( - TSAggregationQueryReq req, SelectResult setResult) { + TSAggregationQueryReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -598,21 +596,21 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -639,7 +637,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -651,13 +649,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } @@ -669,37 +667,37 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private final List<InputLocation[]> inputLocationList = - Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}); + Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}); @SuppressWarnings("java:S2095") // close() do nothing private List<TsBlock> executeGroupByQueryInternal( - SessionInfo sessionInfo, - String device, - String measurement, - TSDataType dataType, - boolean isAligned, - long startTime, - long endTime, - long interval, - TAggregationType aggregationType, - List<DataRegion> dataRegionList) - throws IllegalPathException { + SessionInfo sessionInfo, + String device, + String measurement, + TSDataType dataType, + boolean isAligned, + long startTime, + long endTime, + long interval, + TAggregationType aggregationType, + List<DataRegion> dataRegionList) + throws IllegalPathException { int dataRegionSize = dataRegionList.size(); if (dataRegionSize != 1) { throw new IllegalArgumentException( - "dataRegionList.size() should only be 1 now, current size is " + dataRegionSize); + "dataRegionList.size() should only be 1 now, current size is " + dataRegionSize); } Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1); FakedFragmentInstanceContext fragmentInstanceContext = - new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0)); + new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0)); DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); PlanNodeId planNodeId = new PlanNodeId("1"); OperatorContext operatorContext = - new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext); + new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext); operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE); SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); @@ -708,47 +706,47 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { String aggregationName = SchemaUtils.getBuiltinAggregationName(aggregationType); Aggregator aggregator = - new Aggregator( - AccumulatorFactory.createAccumulator( - aggregationName, - aggregationType, - Collections.singletonList(dataType), - null, - null, - true, - true), - AggregationStep.SINGLE, - inputLocationList); + new Aggregator( + AccumulatorFactory.createAccumulator( + aggregationName, + aggregationType, + Collections.singletonList(dataType), + null, + null, + true, + true), + AggregationStep.SINGLE, + inputLocationList); GroupByTimeParameter groupByTimeParameter = - new GroupByTimeParameter( - startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true); + new GroupByTimeParameter( + startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true); IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType); AbstractSeriesAggregationScanOperator operator; boolean canUseStatistics = - !TSDataType.BLOB.equals(dataType) - || (!TAggregationType.LAST_VALUE.equals(aggregationType) - && !TAggregationType.FIRST_VALUE.equals(aggregationType)); + !TSDataType.BLOB.equals(dataType) + || (!TAggregationType.LAST_VALUE.equals(aggregationType) + && !TAggregationType.FIRST_VALUE.equals(aggregationType)); PartialPath path; if (isAligned) { path = - new AlignedPath( - device.split("\\."), - Collections.singletonList(measurement), - Collections.singletonList(measurementSchema)); + new AlignedPath( + device.split("\\."), + Collections.singletonList(measurement), + Collections.singletonList(measurementSchema)); operator = - new AlignedSeriesAggregationScanOperator( - planNodeId, - (AlignedPath) path, - Ordering.ASC, - scanOptionsBuilder.build(), - operatorContext, - Collections.singletonList(aggregator), - initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), - groupByTimeParameter, - DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - canUseStatistics); + new AlignedSeriesAggregationScanOperator( + planNodeId, + (AlignedPath) path, + Ordering.ASC, + scanOptionsBuilder.build(), + operatorContext, + Collections.singletonList(aggregator), + initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + canUseStatistics); } else { String[] splits = device.split("\\."); String[] fullPaths = new String[splits.length + 1]; @@ -756,17 +754,17 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { fullPaths[splits.length] = measurement; path = new MeasurementPath(fullPaths, measurementSchema); operator = - new SeriesAggregationScanOperator( - planNodeId, - path, - Ordering.ASC, - scanOptionsBuilder.build(), - operatorContext, - Collections.singletonList(aggregator), - initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), - groupByTimeParameter, - DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - canUseStatistics); + new SeriesAggregationScanOperator( + planNodeId, + path, + Ordering.ASC, + scanOptionsBuilder.build(), + operatorContext, + Collections.singletonList(aggregator), + initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + canUseStatistics); } try { @@ -813,7 +811,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath( - final TSFastLastDataQueryForOnePrefixPathReq req) { + final TSFastLastDataQueryForOnePrefixPathReq req) { final IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); if (!SESSION_MANAGER.checkLogin(clientSession)) { return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); @@ -821,39 +819,26 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try { final long queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - // 1.1 Map<Device, String[] measurements> ISchemaFetcher.getAllSensors(prefix) ~= 50ms + // 1. Map<Device, String[] measurements> ISchemaFetcher.getAllSensors(prefix) ~= 50ms final PartialPath prefixPath = new PartialPath(req.getPrefixes().toArray(new String[0])); if (prefixPath.hasWildcard()) { RpcUtils.getTSExecuteStatementResp( - new TSStatus(TSStatusCode.SEMANTIC_ERROR.getStatusCode()) - .setMessage( - "The \"executeFastLastDataQueryForOnePrefixPath\" dos not support wildcards.")); + new TSStatus(TSStatusCode.SEMANTIC_ERROR.getStatusCode()) + .setMessage( + "The \"executeFastLastDataQueryForOnePrefixPath\" dos not support wildcards.")); } final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new HashMap<>(); int sensorNum = 0; - // 1.2 Check permission, the cost is rather low because the req only contains one prefix path - final QueryStatement s = StatementGenerator.createStatement(convert(req)); - final TSStatus status = - AuthorityChecker.checkAuthority( - s, - new TreeAccessCheckContext( - clientSession.getUserId(), - clientSession.getUsername(), - clientSession.getClientAddress())); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return RpcUtils.getTSExecuteStatementResp(status); - } - final String prefixString = prefixPath.toString(); for (final ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) { if (!prefixString.startsWith(region.getDatabaseFullPath()) - && !region.getDatabaseFullPath().startsWith(prefixString)) { + && !region.getDatabaseFullPath().startsWith(prefixString)) { continue; } - sensorNum += region.fillLastQueryMap(prefixPath, resultMap, s.getAuthorityScope()); + sensorNum += region.fillLastQueryMap(prefixPath, resultMap); } // 2.DATA_NODE_SCHEMA_CACHE.getLastCache() @@ -866,27 +851,27 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { final TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); for (final Map.Entry<PartialPath, Map<String, TimeValuePair>> device2MeasurementLastEntry : - resultMap.entrySet()) { + resultMap.entrySet()) { final String deviceWithSeparator = - device2MeasurementLastEntry.getKey() + TsFileConstant.PATH_SEPARATOR; + device2MeasurementLastEntry.getKey() + TsFileConstant.PATH_SEPARATOR; for (final Map.Entry<String, TimeValuePair> measurementLastEntry : - device2MeasurementLastEntry.getValue().entrySet()) { + device2MeasurementLastEntry.getValue().entrySet()) { final TimeValuePair tvPair = measurementLastEntry.getValue(); if (tvPair != DeviceLastCache.EMPTY_TIME_VALUE_PAIR) { LastQueryUtil.appendLastValue( - builder, - tvPair.getTimestamp(), - new Binary( - deviceWithSeparator + measurementLastEntry.getKey(), - TSFileConfig.STRING_CHARSET), - tvPair.getValue().getStringValue(), - tvPair.getValue().getDataType().name()); + builder, + tvPair.getTimestamp(), + new Binary( + deviceWithSeparator + measurementLastEntry.getKey(), + TSFileConfig.STRING_CHARSET), + tvPair.getValue().getStringValue(), + tvPair.getValue().getDataType().name()); } } } final TSExecuteStatementResp resp = - createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); if (builder.isEmpty()) { resp.setQueryResult(Collections.emptyList()); @@ -898,17 +883,17 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } catch (final Exception e) { return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } } private TSLastDataQueryReq convert(final TSFastLastDataQueryForOnePrefixPathReq req) { TSLastDataQueryReq tsLastDataQueryReq = - new TSLastDataQueryReq( - req.sessionId, - Collections.singletonList(String.join(".", req.getPrefixes()) + ".**"), - Long.MIN_VALUE, - req.statementId); + new TSLastDataQueryReq( + req.sessionId, + Collections.singletonList(String.join(".", req.getPrefixes()) + ".**"), + Long.MIN_VALUE, + req.statementId); tsLastDataQueryReq.setFetchSize(req.fetchSize); tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery); tsLastDataQueryReq.setLegalPathNodes(true); @@ -918,7 +903,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( - TSFastLastDataQueryForOneDeviceReq req) { + TSFastLastDataQueryForOneDeviceReq req) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -929,20 +914,6 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long startTime = System.nanoTime(); Throwable t = null; try { - // Place the permission check first - final QueryStatement s = StatementGenerator.createStatement(convert(req)); - // permission check - final TSStatus status = - AuthorityChecker.checkAuthority( - s, - new TreeAccessCheckContext( - clientSession.getUserId(), - clientSession.getUsername(), - clientSession.getClientAddress())); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return RpcUtils.getTSExecuteStatementResp(status); - } - String db; String deviceId; PartialPath devicePath; @@ -960,18 +931,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } DataPartitionQueryParam queryParam = - new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true); + new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true); DataPartition dataPartition = - partitionFetcher.getDataPartitionWithUnclosedTimeRange( - Collections.singletonMap(db, Collections.singletonList(queryParam))); + partitionFetcher.getDataPartitionWithUnclosedTimeRange( + Collections.singletonMap(db, Collections.singletonList(queryParam))); List<TRegionReplicaSet> regionReplicaSets = - dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null); + dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null); // no valid DataRegion if (regionReplicaSets.isEmpty() - || regionReplicaSets.size() == 1 && NOT_ASSIGNED == regionReplicaSets.get(0)) { - final TSExecuteStatementResp resp = - createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + || regionReplicaSets.size() == 1 && NOT_ASSIGNED == regionReplicaSets.get(0)) { + TSExecuteStatementResp resp = + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); resp.setQueryResult(Collections.emptyList()); finished = true; @@ -980,67 +951,58 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } - final TEndPoint lastRegionLeader = - regionReplicaSets - .get(regionReplicaSets.size() - 1) - .dataNodeLocations - .get(0) - .mPPDataExchangeEndPoint; + TEndPoint lastRegionLeader = + regionReplicaSets + .get(regionReplicaSets.size() - 1) + .dataNodeLocations + .get(0) + .mPPDataExchangeEndPoint; // the device's dataRegion's leader of the latest time partition is on current node, may can // read directly from cache if (isSameNode(lastRegionLeader)) { // the device's all dataRegions' leader are on current node, can use null entry in cache - final boolean canUseNullEntry = - regionReplicaSets.stream() - .limit(regionReplicaSets.size() - 1L) - .allMatch( - regionReplicaSet -> - isSameNode( - regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint)); - final int sensorNum = req.sensors.size(); - final TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); + boolean canUseNullEntry = + regionReplicaSets.stream() + .limit(regionReplicaSets.size() - 1L) + .allMatch( + regionReplicaSet -> + isSameNode( + regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint)); + int sensorNum = req.sensors.size(); + TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); boolean allCached = true; - - PathPatternTree queryTree = new PathPatternTree(); - for (final String sensor : req.sensors) { - final MeasurementPath fullPath; + for (String sensor : req.sensors) { + PartialPath fullPath; if (req.isLegalPathNodes()) { - fullPath = devicePath.concatAsMeasurementPath(sensor); + fullPath = devicePath.concatNode(sensor); } else { - fullPath = devicePath.concatAsMeasurementPath((new PartialPath(sensor)).getFullPath()); + fullPath = devicePath.concatNode((new PartialPath(sensor)).getFullPath()); } - queryTree.appendPathPattern(fullPath); - } - queryTree.constructTree(); - queryTree = s.getAuthorityScope().intersectWithFullPathPrefixTree(queryTree); - - if (!queryTree.isEmpty()) { - for (final MeasurementPath fullPath : queryTree.getAllPathPatterns(true)) { - final TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath); - if (timeValuePair == null) { + TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath); + if (timeValuePair == null) { + allCached = false; + break; + } else if (timeValuePair.getValue() == null) { + // there is no data for this sensor + if (!canUseNullEntry) { allCached = false; break; - } else if (timeValuePair.getValue() == null) { - // there is no data for this sensor - if (!canUseNullEntry) { - allCached = false; - break; - } + } } else { // we don't consider TTL LastQueryUtil.appendLastValue( - builder, - timeValuePair.getTimestamp(), - new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET), - timeValuePair.getValue().getStringValue(), - timeValuePair.getValue().getDataType().name()); + builder, + timeValuePair.getTimestamp(), + new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET), + timeValuePair.getValue().getStringValue(), + timeValuePair.getValue().getDataType().name()); } } // cache hit if (allCached) { TSExecuteStatementResp resp = - createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); if (builder.isEmpty()) { resp.setQueryResult(Collections.emptyList()); @@ -1054,24 +1016,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + // cache miss + Statement s = StatementGenerator.createStatement(convert(req)); + // permission check + TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSExecuteStatementResp(status); + } + quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("Last Data Query: %s", req), s); } // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -1086,11 +1056,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp = createResponse(queryExecution.getDatasetHeader(), queryId); TSStatus tsstatus = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); tsstatus.setRedirectNode( - regionReplicaSets - .get(regionReplicaSets.size() - 1) - .dataNodeLocations - .get(0) - .clientRpcEndPoint); + regionReplicaSets + .get(regionReplicaSets.size() - 1) + .dataNodeLocations + .get(0) + .clientRpcEndPoint); resp.setStatus(tsstatus); finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize); resp.setMoreData(!finished); @@ -1105,7 +1075,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -1117,13 +1087,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -1139,7 +1109,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { paths.add(req.deviceId + "." + sensor); } TSLastDataQueryReq tsLastDataQueryReq = - new TSLastDataQueryReq(req.sessionId, paths, Long.MIN_VALUE, req.statementId); + new TSLastDataQueryReq(req.sessionId, paths, Long.MIN_VALUE, req.statementId); tsLastDataQueryReq.setFetchSize(req.fetchSize); tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery); tsLastDataQueryReq.setLegalPathNodes(req.legalPathNodes); @@ -1148,7 +1118,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private static void sampleForCacheHitFastLastDataQueryForOneDevice( - TSFastLastDataQueryForOneDeviceReq req) { + TSFastLastDataQueryForOneDeviceReq req) { // only sample successful query if (COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled String queryRequest = getContentOfTSFastLastDataQueryForOneDeviceReq(req); @@ -1197,7 +1167,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<List<ByteBuffer>, Boolean> pair = - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); + QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); List<ByteBuffer> result = pair.left; finished = pair.right; boolean hasResultSet = !result.isEmpty(); @@ -1211,7 +1181,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSFetchResultsResp( - onQueryException(e, getContentOfRequest(req, queryExecution))); + onQueryException(e, getContentOfRequest(req, queryExecution))); } catch (Error error) { finished = true; t = error; @@ -1223,13 +1193,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.FETCH_RESULTS, statementType, currentOperationCost); + OperationType.FETCH_RESULTS, statementType, currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } @@ -1241,13 +1211,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req); BasicOpenSessionResp openSessionResp = - SESSION_MANAGER.login( - SESSION_MANAGER.getCurrSession(), - req.username, - req.password, - req.zoneId, - req.client_protocol, - clientVersion); + SESSION_MANAGER.login( + SESSION_MANAGER.getCurrSession(), + req.username, + req.password, + req.zoneId, + req.client_protocol, + clientVersion); TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage()); TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); return resp.setSessionId(openSessionResp.getSessionId()); @@ -1264,10 +1234,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus closeSession(TSCloseSessionReq req) { return new TSStatus( - !SESSION_MANAGER.closeSession( - SESSION_MANAGER.getCurrSession(), COORDINATOR::cleanupQueryExecution) - ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN) - : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + !SESSION_MANAGER.closeSession( + SESSION_MANAGER.getCurrSession(), COORDINATOR::cleanupQueryExecution) + ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN) + : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } @Override @@ -1279,12 +1249,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus closeOperation(TSCloseOperationReq req) { return SESSION_MANAGER.closeOperation( - SESSION_MANAGER.getCurrSession(), - req.queryId, - req.statementId, - req.isSetStatementId(), - req.isSetQueryId(), - COORDINATOR::cleanupQueryExecution); + SESSION_MANAGER.getCurrSession(), + req.queryId, + req.statementId, + req.isSetStatementId(), + req.isSetQueryId(), + COORDINATOR::cleanupQueryExecution); } @Override @@ -1292,13 +1262,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try { ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId(); return new TSGetTimeZoneResp( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), - zoneId != null ? zoneId.toString() : "Unknown time zone"); + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + zoneId != null ? zoneId.toString() : "Unknown time zone"); } catch (Exception e) { return new TSGetTimeZoneResp( - onNpeOrUnexpectedException( - e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR), - "Unknown time zone"); + onNpeOrUnexpectedException( + e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR), + "Unknown time zone"); } } @@ -1309,7 +1279,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR); + e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR); } } @@ -1324,12 +1294,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME); properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME); properties.setTimestampPrecision( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); properties.setMaxConcurrentClientNum( - IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum()); + IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum()); properties.setIsReadOnly(CommonDescriptor.getInstance().getConfig().isReadOnly()); properties.setThriftMaxFrameSize( - IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()); + IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()); return properties; } @@ -1355,20 +1325,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.SET_STORAGE_GROUP, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.SET_STORAGE_GROUP, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.SET_STORAGE_GROUP, TSStatusCode.EXECUTE_STATEMENT_ERROR); } } @@ -1396,20 +1366,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1425,7 +1395,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementAlias( - PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAlias())); + PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAlias())); req.setMeasurements(PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurements())); @@ -1433,9 +1403,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { CreateAlignedTimeSeriesStatement statement = StatementGenerator.createStatement(req); if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "create aligned timeseries %s.%s", req.getPrefixPath(), req.getMeasurements()), - statement); + String.format( + "create aligned timeseries %s.%s", req.getPrefixPath(), req.getMeasurements()), + statement); } // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1446,20 +1416,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_ALIGNED_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_ALIGNED_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1475,16 +1445,16 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementAliasList( - PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAliasList())); + PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAliasList())); // Step 1: transfer from CreateMultiTimeSeriesReq to Statement CreateMultiTimeSeriesStatement statement = StatementGenerator.createStatement(req); if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "create %s timeseries, the first is %s", - req.getPaths().size(), req.getPaths().get(0)), - statement); + String.format( + "create %s timeseries, the first is %s", + req.getPaths().size(), req.getPaths().get(0)), + statement); } // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1495,20 +1465,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_MULTI_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_MULTI_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_MULTI_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1524,7 +1494,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 1: transfer from DeleteStorageGroupsReq to Statement DeleteTimeSeriesStatement statement = - StatementGenerator.createDeleteTimeSeriesStatement(path); + StatementGenerator.createDeleteTimeSeriesStatement(path); // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1535,20 +1505,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.DELETE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.DELETE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1578,20 +1548,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.DELETE_STORAGE_GROUPS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.DELETE_STORAGE_GROUPS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1600,7 +1570,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) { return new TSFetchMetadataResp( - RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION, "Fetch Metadata is not supported.")); + RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION, "Fetch Metadata is not supported.")); } @Override @@ -1628,7 +1598,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); if (s == null) { return RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"); + TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"); } // permission check TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); @@ -1637,8 +1607,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(statement, s); @@ -1652,38 +1622,38 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { ExecutionResult result; if (s.shouldSplit()) { result = - executeBatchStatement( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); } else { result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); } results.add(result.status); } catch (Exception e) { LOGGER.warn("Error occurred when executing executeBatchStatement: ", e); TSStatus status = - onQueryException( - e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT); + onQueryException( + e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT); isAllSuccessful = false; results.add(status); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_STATEMENT, type, System.nanoTime() - t2); + OperationType.EXECUTE_STATEMENT, type, System.nanoTime() - t2); if (quota != null) { quota.close(); } @@ -1691,12 +1661,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } finally { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), System.nanoTime() - t1); + OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); } return isAllSuccessful - ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully") - : RpcUtils.getStatus(results); + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully") + : RpcUtils.getStatus(results); } @Override @@ -1735,7 +1705,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<TSQueryDataSet, Boolean> pair = - convertTsBlockByFetchSize(queryExecution, req.fetchSize); + convertTsBlockByFetchSize(queryExecution, req.fetchSize); TSQueryDataSet result = pair.left; finished = pair.right; boolean hasResultSet = result.bufferForTime().limit() != 0; @@ -1749,7 +1719,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSFetchResultsResp( - onQueryException(e, getContentOfRequest(req, queryExecution))); + onQueryException(e, getContentOfRequest(req, queryExecution))); } catch (Error error) { finished = true; t = error; @@ -1761,13 +1731,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.FETCH_RESULTS, statementType, currentOperationCost); + OperationType.FETCH_RESULTS, statementType, currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } @@ -1787,7 +1757,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); @@ -1798,11 +1768,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPaths.get(0), req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPaths.get(0), req.getTimestamps().get(0)), + statement, + true); } // permission check @@ -1812,31 +1782,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_RECORDS, - StatementType.BATCH_INSERT_ROWS.name(), - System.nanoTime() - t1); + OperationType.INSERT_RECORDS, + StatementType.BATCH_INSERT_ROWS.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1856,7 +1826,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsOfOneDeviceReq to Statement InsertRowsOfOneDeviceStatement statement = StatementGenerator.createStatement(req); @@ -1867,11 +1837,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPath, req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPath, req.getTimestamps().get(0)), + statement, + true); } // permission check @@ -1881,31 +1851,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_RECORDS_OF_ONE_DEVICE, - StatementType.BATCH_INSERT_ONE_DEVICE.name(), - System.nanoTime() - t1); + OperationType.INSERT_RECORDS_OF_ONE_DEVICE, + StatementType.BATCH_INSERT_ONE_DEVICE.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1925,7 +1895,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertStringRecordsOfOneDeviceReq to Statement InsertRowsOfOneDeviceStatement statement = StatementGenerator.createStatement(req); @@ -1936,11 +1906,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPath, req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPath, req.getTimestamps().get(0)), + statement, + true); } // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1949,34 +1919,34 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException( - e, OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, e.getErrorCode()); + e, OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, - OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, - TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, + OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, + TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, - StatementType.BATCH_INSERT_ONE_DEVICE.name(), - System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, + StatementType.BATCH_INSERT_ONE_DEVICE.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2005,10 +1975,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), - statement, - true); + String.format( + "insertRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), + statement, + true); } // permission check @@ -2018,29 +1988,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); + OperationType.INSERT_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2059,7 +2029,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertTabletsReq to Statement InsertMultiTabletsStatement statement = StatementGenerator.createStatement(req); @@ -2075,31 +2045,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_TABLETS, - StatementType.MULTI_BATCH_INSERT.name(), - System.nanoTime() - t1); + OperationType.INSERT_TABLETS, + StatementType.MULTI_BATCH_INSERT.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2133,29 +2103,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(), System.nanoTime() - t1); + OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2175,7 +2145,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); InsertRowsStatement statement = StatementGenerator.createStatement(req); // return success when this statement is empty because server doesn't need to execute it @@ -2185,11 +2155,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPaths.get(0), req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPaths.get(0), req.getTimestamps().get(0)), + statement, + true); } // permission check @@ -2199,30 +2169,30 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_STRING_RECORDS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_STRING_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_STRING_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORDS, - StatementType.BATCH_INSERT_ROWS.name(), - System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORDS, + StatementType.BATCH_INSERT_ROWS.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2290,20 +2260,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_DATA, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.DELETE_DATA, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.DELETE_DATA, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2354,20 +2324,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_SCHEMA_TEMPLATE, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_SCHEMA_TEMPLATE, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_SCHEMA_TEMPLATE, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2376,13 +2346,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) { return RpcUtils.getStatus( - TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); + TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); } @Override public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) { return RpcUtils.getStatus( - TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); + TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); } @Override @@ -2403,9 +2373,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Statement statement = StatementGenerator.createStatement(req); if (statement == null) { resp.setStatus( - RpcUtils.getStatus( - TSStatusCode.UNSUPPORTED_OPERATION, - TemplateQueryType.values()[req.getQueryType()].name() + "has not been supported.")); + RpcUtils.getStatus( + TSStatusCode.UNSUPPORTED_OPERATION, + TemplateQueryType.values()[req.getQueryType()].name() + "has not been supported.")); return resp; } switch (TemplateQueryType.values()[req.getQueryType()]) { @@ -2427,8 +2397,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return executeTemplateQueryStatement(statement, req, resp); } catch (Exception e) { resp.setStatus( - onNpeOrUnexpectedException( - e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); + onNpeOrUnexpectedException( + e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); return resp; } finally { SESSION_MANAGER.updateIdleTime(); @@ -2449,8 +2419,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } catch (Exception e) { resp.setStatus( - onNpeOrUnexpectedException( - e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); + onNpeOrUnexpectedException( + e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); return resp; } finally { SESSION_MANAGER.updateIdleTime(); @@ -2474,22 +2444,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (IoTDBDescriptor.getInstance().getConfig().getDataNodeId() == nodeId) { resp.setContent( - ConfigurationFileUtils.readConfigFileContent( - IoTDBDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME))); + ConfigurationFileUtils.readConfigFileContent( + IoTDBDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME))); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); return resp; } try (ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { return client.showConfiguration(nodeId); } catch (ClientManagerException e) { throw new TException(e); } } catch (Exception e) { resp.setStatus( - onNpeOrUnexpectedException( - e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); + onNpeOrUnexpectedException( + e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); return resp; } finally { SESSION_MANAGER.updateIdleTime(); @@ -2497,7 +2467,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSQueryTemplateResp executeTemplateQueryStatement( - Statement statement, TSQueryTemplateReq req, TSQueryTemplateResp resp) { + Statement statement, TSQueryTemplateReq req, TSQueryTemplateResp resp) { long startTime = System.nanoTime(); OperationQuota quota = null; try { @@ -2510,8 +2480,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("execute Query: %s", statement), statement); @@ -2519,18 +2489,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult executionResult = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - null, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - true); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + null, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + true); if (executionResult.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && executionResult.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + && executionResult.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { resp.setStatus(executionResult.status); return resp; } @@ -2560,13 +2530,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } catch (Exception e) { resp.setStatus( - onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); + onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); return null; } finally { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_STATEMENT, - statement.getType().name(), - System.nanoTime() - startTime); + OperationType.EXECUTE_STATEMENT, + statement.getType().name(), + System.nanoTime() - startTime); SESSION_MANAGER.updateIdleTime(); } } @@ -2586,8 +2556,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format("set device template %s.%s", req.getTemplateName(), req.getPrefixPath()), - statement); + String.format("set device template %s.%s", req.getTemplateName(), req.getPrefixPath()), + statement); } // permission check @@ -2599,20 +2569,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2633,9 +2603,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "unset device template %s from %s", req.getTemplateName(), req.getPrefixPath()), - statement); + String.format( + "unset device template %s from %s", req.getTemplateName(), req.getPrefixPath()), + statement); } // permission check @@ -2647,20 +2617,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2692,18 +2662,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2711,7 +2681,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus createTimeseriesUsingSchemaTemplate(TCreateTimeseriesUsingSchemaTemplateReq req) - throws TException { + throws TException { try { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); if (!SESSION_MANAGER.checkLogin(clientSession)) { @@ -2720,11 +2690,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 1: transfer to Statement BatchActivateTemplateStatement statement = - StatementGenerator.createBatchActivateTemplateStatement(req.getDevicePathList()); + StatementGenerator.createBatchActivateTemplateStatement(req.getDevicePathList()); if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format("batch activate device template %s", req.getDevicePathList()), statement); + String.format("batch activate device template %s", req.getDevicePathList()), statement); } // permission check @@ -2736,20 +2706,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2758,12 +2728,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus handshake(final TSyncIdentityInfo info) throws TException { return PipeDataNodeAgent.receiver() - .legacy() - .handshake( - info, - SESSION_MANAGER.getCurrSession().getClientAddress(), - partitionFetcher, - schemaFetcher); + .legacy() + .handshake( + info, + SESSION_MANAGER.getCurrSession().getClientAddress(), + partitionFetcher, + schemaFetcher); } @Override @@ -2773,7 +2743,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final ByteBuffer buff) - throws TException { + throws TException { return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff); } @@ -2819,10 +2789,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertStringRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), - statement, - true); + String.format( + "insertStringRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), + statement, + true); } // Permission check @@ -2832,29 +2802,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: Call the coordinator final long queryId = SESSION_MANAGER.requestQueryId(); final ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_STRING_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_STRING_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2877,8 +2847,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { protected TSStatus getNotLoggedInStatus() { return RpcUtils.getStatus( - TSStatusCode.NOT_LOGIN, - "Log in failed. Either you are not authorized or the session has timed out."); + TSStatusCode.NOT_LOGIN, + "Log in failed. Either you are not authorized or the session has timed out."); } private String checkIdentifierAndRemoveBackQuotesIfNecessary(String identifier) { @@ -2910,74 +2880,74 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { * @return the execution result */ private ExecutionResult executeBatchStatement( - final Statement statement, - final long queryId, - final SessionInfo sessionInfo, - final String statementStr, - final IPartitionFetcher partitionFetcher, - final ISchemaFetcher schemaFetcher, - final long timeoutMs, - final boolean userQuery) { + final Statement statement, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final IPartitionFetcher partitionFetcher, + final ISchemaFetcher schemaFetcher, + final long timeoutMs, + final boolean userQuery) { ExecutionResult result = null; final List<? extends Statement> subStatements = statement.getSubStatements(); final int totalSubStatements = subStatements.size(); LOGGER.info( - "Start batch executing {} sub-statement(s) in tree model, queryId: {}", - totalSubStatements, - queryId); + "Start batch executing {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); for (int i = 0; i < totalSubStatements; i++) { final Statement subStatement = subStatements.get(i); LOGGER.info( - "Executing sub-statement {}/{} in tree model, queryId: {}", - i + 1, - totalSubStatements, - queryId); + "Executing sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); result = - COORDINATOR.executeForTreeModel( - subStatement, - queryId, - sessionInfo, - statementStr, - partitionFetcher, - schemaFetcher, - timeoutMs, - userQuery); + COORDINATOR.executeForTreeModel( + subStatement, + queryId, + sessionInfo, + statementStr, + partitionFetcher, + schemaFetcher, + timeoutMs, + userQuery); // Exit early if any sub-statement execution fails if (result != null - && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { final int completed = i + 1; final int remaining = totalSubStatements - completed; final double percentage = (completed * 100.0) / totalSubStatements; LOGGER.warn( - "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", - i + 1, - totalSubStatements, - queryId, - completed, - remaining, - String.format("%.2f", percentage), - result.status.getMessage()); + "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", + i + 1, + totalSubStatements, + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); break; } LOGGER.info( - "Successfully executed sub-statement {}/{} in tree model, queryId: {}", - i + 1, - totalSubStatements, - queryId); + "Successfully executed sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); } if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( - "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", - totalSubStatements, - queryId); + "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); } return result;
