This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch c-ger-p in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 247e88c24cb06d53a1930615bed09c596341d4ec Author: Caideyipi <[email protected]> AuthorDate: Thu Apr 16 09:55:51 2026 +0800 part --- .../org/apache/iotdb/db/it/IoTDBRestServiceIT.java | 2 +- .../protocol/rest/v2/impl/RestApiServiceImpl.java | 32 +- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 1204 ++++++++++---------- .../schemaengine/schemaregion/ISchemaRegion.java | 4 +- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 4 +- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 4 +- .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 4 +- 7 files changed, 620 insertions(+), 634 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index e8eca4937d5..61513272c29 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -18,8 +18,8 @@ */ package org.apache.iotdb.db.it; -import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java index 594a1d794a4..05fc4b80084 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.protocol.rest.v2.handler.RequestValidationHandler; import org.apache.iotdb.db.protocol.rest.v2.handler.StatementConstructionHandler; import org.apache.iotdb.db.protocol.rest.v2.model.InsertRecordsRequest; import org.apache.iotdb.db.protocol.rest.v2.model.InsertTabletRequest; +import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList; import org.apache.iotdb.db.protocol.rest.v2.model.QueryDataSet; import org.apache.iotdb.db.protocol.rest.v2.model.SQL; import org.apache.iotdb.db.protocol.session.IClientSession; @@ -54,7 +55,6 @@ import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.db.utils.CommonUtils; @@ -102,7 +102,7 @@ public class RestApiServiceImpl extends RestApiService { public Response executeFastLastQueryStatement( PrefixPathList prefixPathList, SecurityContext securityContext) { Long queryId = null; - QueryStatement statement = null; + Statement statement = null; boolean finish = false; long startTime = System.nanoTime(); @@ -111,23 +111,7 @@ public class RestApiServiceImpl extends RestApiService { PartialPath prefixPath = new PartialPath(prefixPathList.getPrefixPaths().toArray(new String[0])); -<<<<<<< HEAD:iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new HashMap<>(); -======= - final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>> resultMap = - new HashMap<>(); - - // Check permission, the cost is rather low because the req only contains one prefix path - final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); - final TSLastDataQueryReq tsLastDataQueryReq = - FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList); - statement = StatementGenerator.createStatement(tsLastDataQueryReq); - - final Response response = authorizationHandler.checkAuthority(securityContext, statement); - if (response != null) { - return response; - } ->>>>>>> bb0f13b78bb (Enhance the last query permission && Fixed the rollback version of alter view / table plans && Deleted the unnecessary mods in Tree view deletion (#17465)):external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/impl/RestApiServiceImpl.java final String prefixString = prefixPath.toString(); for (ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) { @@ -135,28 +119,22 @@ public class RestApiServiceImpl extends RestApiService { && !region.getDatabaseFullPath().startsWith(prefixString)) { continue; } -<<<<<<< HEAD:iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java region.fillLastQueryMap(prefixPath, resultMap); -======= - region.fillLastQueryMap(prefixPath, resultMap, statement.getAuthorityScope()); ->>>>>>> bb0f13b78bb (Enhance the last query permission && Fixed the rollback version of alter view / table plans && Deleted the unnecessary mods in Tree view deletion (#17465)):external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/impl/RestApiServiceImpl.java } - // Check cache first -<<<<<<< HEAD:iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java if (!DataNodeSchemaCache.getInstance().getDeviceSchemaCache().getLastCache(resultMap)) { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); TSLastDataQueryReq tsLastDataQueryReq = FastLastHandler.createTSLastDataQueryReq(clientSession, prefixPathList); statement = StatementGenerator.createStatement(tsLastDataQueryReq); -======= - if (!TableDeviceSchemaCache.getInstance().getLastCache(resultMap)) { ->>>>>>> bb0f13b78bb (Enhance the last query permission && Fixed the rollback version of alter view / table plans && Deleted the unnecessary mods in Tree view deletion (#17465)):external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/impl/RestApiServiceImpl.java if (ExecuteStatementHandler.validateStatement(statement)) { return FastLastHandler.buildErrorResponse(TSStatusCode.EXECUTE_STATEMENT_ERROR); } + Optional.ofNullable(authorizationHandler.checkAuthority(securityContext, statement)) + .ifPresent(Response.class::cast); + queryId = SESSION_MANAGER.requestQueryId(); SessionInfo sessionInfo = SESSION_MANAGER.getSessionInfo(clientSession); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 59b19f9a6cf..8a498fce2e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -216,7 +216,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private static final Logger SAMPLED_QUERIES_LOGGER = - LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); + LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); private static final Coordinator COORDINATOR = Coordinator.getInstance(); @@ -225,7 +225,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public static final String ERROR_CODE = "error code: "; private static final TSProtocolVersion CURRENT_RPC_VERSION = - TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; private static final boolean ENABLE_AUDIT_LOG = config.isEnableAuditLog(); @@ -240,31 +240,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public static final Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS); private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); private static final String NO_QUERY_EXECUTION_ERR_MSG = - "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; + "Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log."; @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) - throws IoTDBException, IOException; + throws IoTDBException, IOException; } private static final SelectResult SELECT_RESULT = - (resp, queryExecution, fetchSize) -> { - Pair<List<ByteBuffer>, Boolean> pair = - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize); - resp.setQueryResult(pair.left); - return pair.right; - }; + (resp, queryExecution, fetchSize) -> { + Pair<List<ByteBuffer>, Boolean> pair = + QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize); + resp.setQueryResult(pair.left); + return pair.right; + }; private static final SelectResult OLD_SELECT_RESULT = - (resp, queryExecution, fetchSize) -> { - Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize); - resp.setQueryDataSet(pair.left); - return pair.right; - }; + (resp, queryExecution, fetchSize) -> { + Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize); + resp.setQueryDataSet(pair.left); + return pair.right; + }; public ClientRPCServiceImpl() { partitionFetcher = ClusterPartitionFetcher.getInstance(); @@ -272,7 +272,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeStatementInternal( - TSExecuteStatementReq req, SelectResult setResult) { + TSExecuteStatementReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; String statement = req.getStatement(); @@ -291,8 +291,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (s == null) { return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus( - TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); } // permission check TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); @@ -301,8 +301,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); statementType = s.getType(); if (ENABLE_AUDIT_LOG) { AuditLogger.log(statement, s); @@ -314,30 +314,30 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { ExecutionResult result; if (s.shouldSplit()) { result = - executeBatchStatement( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - true); + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + true); } else { result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { finished = true; return RpcUtils.getTSExecuteStatementResp(result.status); } @@ -362,7 +362,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); + onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); } catch (Error error) { finished = true; t = error; @@ -374,14 +374,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost if (statementType != null) { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), currentOperationCost); + OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), currentOperationCost); } if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - statementType, executionTime > 0 ? executionTime : currentOperationCost); + statementType, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -392,7 +392,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeRawDataQueryInternal( - TSRawDataQueryReq req, SelectResult setResult) { + TSRawDataQueryReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -412,8 +412,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("execute Raw Data Query: %s", req), s); @@ -421,15 +421,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -455,7 +455,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -466,13 +466,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } @@ -484,7 +484,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeLastDataQueryInternal( - TSLastDataQueryReq req, SelectResult setResult) { + TSLastDataQueryReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -503,8 +503,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("Last Data Query: %s", req), s); @@ -512,15 +512,15 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -547,7 +547,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -559,13 +559,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } @@ -577,7 +577,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSExecuteStatementResp executeAggregationQueryInternal( - TSAggregationQueryReq req, SelectResult setResult) { + TSAggregationQueryReq req, SelectResult setResult) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -596,21 +596,21 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -637,7 +637,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -649,13 +649,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } @@ -667,37 +667,37 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private final List<InputLocation[]> inputLocationList = - Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}); + Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}); @SuppressWarnings("java:S2095") // close() do nothing private List<TsBlock> executeGroupByQueryInternal( - SessionInfo sessionInfo, - String device, - String measurement, - TSDataType dataType, - boolean isAligned, - long startTime, - long endTime, - long interval, - TAggregationType aggregationType, - List<DataRegion> dataRegionList) - throws IllegalPathException { + SessionInfo sessionInfo, + String device, + String measurement, + TSDataType dataType, + boolean isAligned, + long startTime, + long endTime, + long interval, + TAggregationType aggregationType, + List<DataRegion> dataRegionList) + throws IllegalPathException { int dataRegionSize = dataRegionList.size(); if (dataRegionSize != 1) { throw new IllegalArgumentException( - "dataRegionList.size() should only be 1 now, current size is " + dataRegionSize); + "dataRegionList.size() should only be 1 now, current size is " + dataRegionSize); } Filter timeFilter = TimeFilterApi.between(startTime, endTime - 1); FakedFragmentInstanceContext fragmentInstanceContext = - new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0)); + new FakedFragmentInstanceContext(timeFilter, dataRegionList.get(0)); DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); PlanNodeId planNodeId = new PlanNodeId("1"); OperatorContext operatorContext = - new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext); + new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext); operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE); SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); @@ -706,47 +706,47 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { String aggregationName = SchemaUtils.getBuiltinAggregationName(aggregationType); Aggregator aggregator = - new Aggregator( - AccumulatorFactory.createAccumulator( - aggregationName, - aggregationType, - Collections.singletonList(dataType), - null, - null, - true, - true), - AggregationStep.SINGLE, - inputLocationList); + new Aggregator( + AccumulatorFactory.createAccumulator( + aggregationName, + aggregationType, + Collections.singletonList(dataType), + null, + null, + true, + true), + AggregationStep.SINGLE, + inputLocationList); GroupByTimeParameter groupByTimeParameter = - new GroupByTimeParameter( - startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true); + new GroupByTimeParameter( + startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true); IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType); AbstractSeriesAggregationScanOperator operator; boolean canUseStatistics = - !TSDataType.BLOB.equals(dataType) - || (!TAggregationType.LAST_VALUE.equals(aggregationType) - && !TAggregationType.FIRST_VALUE.equals(aggregationType)); + !TSDataType.BLOB.equals(dataType) + || (!TAggregationType.LAST_VALUE.equals(aggregationType) + && !TAggregationType.FIRST_VALUE.equals(aggregationType)); PartialPath path; if (isAligned) { path = - new AlignedPath( - device.split("\\."), - Collections.singletonList(measurement), - Collections.singletonList(measurementSchema)); + new AlignedPath( + device.split("\\."), + Collections.singletonList(measurement), + Collections.singletonList(measurementSchema)); operator = - new AlignedSeriesAggregationScanOperator( - planNodeId, - (AlignedPath) path, - Ordering.ASC, - scanOptionsBuilder.build(), - operatorContext, - Collections.singletonList(aggregator), - initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), - groupByTimeParameter, - DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - canUseStatistics); + new AlignedSeriesAggregationScanOperator( + planNodeId, + (AlignedPath) path, + Ordering.ASC, + scanOptionsBuilder.build(), + operatorContext, + Collections.singletonList(aggregator), + initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + canUseStatistics); } else { String[] splits = device.split("\\."); String[] fullPaths = new String[splits.length + 1]; @@ -754,17 +754,17 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { fullPaths[splits.length] = measurement; path = new MeasurementPath(fullPaths, measurementSchema); operator = - new SeriesAggregationScanOperator( - planNodeId, - path, - Ordering.ASC, - scanOptionsBuilder.build(), - operatorContext, - Collections.singletonList(aggregator), - initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), - groupByTimeParameter, - DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - canUseStatistics); + new SeriesAggregationScanOperator( + planNodeId, + path, + Ordering.ASC, + scanOptionsBuilder.build(), + operatorContext, + Collections.singletonList(aggregator), + initTimeRangeIterator(groupByTimeParameter, true, true, sessionInfo.getZoneId()), + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + canUseStatistics); } try { @@ -811,7 +811,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath( - final TSFastLastDataQueryForOnePrefixPathReq req) { + final TSFastLastDataQueryForOnePrefixPathReq req) { final IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); if (!SESSION_MANAGER.checkLogin(clientSession)) { return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); @@ -824,9 +824,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { final PartialPath prefixPath = new PartialPath(req.getPrefixes().toArray(new String[0])); if (prefixPath.hasWildcard()) { RpcUtils.getTSExecuteStatementResp( - new TSStatus(TSStatusCode.SEMANTIC_ERROR.getStatusCode()) - .setMessage( - "The \"executeFastLastDataQueryForOnePrefixPath\" dos not support wildcards.")); + new TSStatus(TSStatusCode.SEMANTIC_ERROR.getStatusCode()) + .setMessage( + "The \"executeFastLastDataQueryForOnePrefixPath\" dos not support wildcards.")); } final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new HashMap<>(); @@ -835,7 +835,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { final String prefixString = prefixPath.toString(); for (final ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) { if (!prefixString.startsWith(region.getDatabaseFullPath()) - && !region.getDatabaseFullPath().startsWith(prefixString)) { + && !region.getDatabaseFullPath().startsWith(prefixString)) { continue; } sensorNum += region.fillLastQueryMap(prefixPath, resultMap); @@ -851,27 +851,27 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { final TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); for (final Map.Entry<PartialPath, Map<String, TimeValuePair>> device2MeasurementLastEntry : - resultMap.entrySet()) { + resultMap.entrySet()) { final String deviceWithSeparator = - device2MeasurementLastEntry.getKey() + TsFileConstant.PATH_SEPARATOR; + device2MeasurementLastEntry.getKey() + TsFileConstant.PATH_SEPARATOR; for (final Map.Entry<String, TimeValuePair> measurementLastEntry : - device2MeasurementLastEntry.getValue().entrySet()) { + device2MeasurementLastEntry.getValue().entrySet()) { final TimeValuePair tvPair = measurementLastEntry.getValue(); if (tvPair != DeviceLastCache.EMPTY_TIME_VALUE_PAIR) { LastQueryUtil.appendLastValue( - builder, - tvPair.getTimestamp(), - new Binary( - deviceWithSeparator + measurementLastEntry.getKey(), - TSFileConfig.STRING_CHARSET), - tvPair.getValue().getStringValue(), - tvPair.getValue().getDataType().name()); + builder, + tvPair.getTimestamp(), + new Binary( + deviceWithSeparator + measurementLastEntry.getKey(), + TSFileConfig.STRING_CHARSET), + tvPair.getValue().getStringValue(), + tvPair.getValue().getDataType().name()); } } } final TSExecuteStatementResp resp = - createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); if (builder.isEmpty()) { resp.setQueryResult(Collections.emptyList()); @@ -883,17 +883,17 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } catch (final Exception e) { return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } } private TSLastDataQueryReq convert(final TSFastLastDataQueryForOnePrefixPathReq req) { TSLastDataQueryReq tsLastDataQueryReq = - new TSLastDataQueryReq( - req.sessionId, - Collections.singletonList(String.join(".", req.getPrefixes()) + ".**"), - Long.MIN_VALUE, - req.statementId); + new TSLastDataQueryReq( + req.sessionId, + Collections.singletonList(String.join(".", req.getPrefixes()) + ".**"), + Long.MIN_VALUE, + req.statementId); tsLastDataQueryReq.setFetchSize(req.fetchSize); tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery); tsLastDataQueryReq.setLegalPathNodes(true); @@ -903,7 +903,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2( - TSFastLastDataQueryForOneDeviceReq req) { + TSFastLastDataQueryForOneDeviceReq req) { boolean finished = false; long queryId = Long.MIN_VALUE; IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -931,18 +931,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } DataPartitionQueryParam queryParam = - new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true); + new DataPartitionQueryParam(deviceId, Collections.emptyList(), true, true); DataPartition dataPartition = - partitionFetcher.getDataPartitionWithUnclosedTimeRange( - Collections.singletonMap(db, Collections.singletonList(queryParam))); + partitionFetcher.getDataPartitionWithUnclosedTimeRange( + Collections.singletonMap(db, Collections.singletonList(queryParam))); List<TRegionReplicaSet> regionReplicaSets = - dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null); + dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceId, null); // no valid DataRegion if (regionReplicaSets.isEmpty() - || regionReplicaSets.size() == 1 && NOT_ASSIGNED == regionReplicaSets.get(0)) { + || regionReplicaSets.size() == 1 && NOT_ASSIGNED == regionReplicaSets.get(0)) { TSExecuteStatementResp resp = - createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); resp.setQueryResult(Collections.emptyList()); finished = true; @@ -952,23 +952,23 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } TEndPoint lastRegionLeader = - regionReplicaSets - .get(regionReplicaSets.size() - 1) - .dataNodeLocations - .get(0) - .mPPDataExchangeEndPoint; + regionReplicaSets + .get(regionReplicaSets.size() - 1) + .dataNodeLocations + .get(0) + .mPPDataExchangeEndPoint; // the device's dataRegion's leader of the latest time partition is on current node, may can // read directly from cache if (isSameNode(lastRegionLeader)) { // the device's all dataRegions' leader are on current node, can use null entry in cache boolean canUseNullEntry = - regionReplicaSets.stream() - .limit(regionReplicaSets.size() - 1L) - .allMatch( - regionReplicaSet -> - isSameNode( - regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint)); + regionReplicaSets.stream() + .limit(regionReplicaSets.size() - 1L) + .allMatch( + regionReplicaSet -> + isSameNode( + regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint)); int sensorNum = req.sensors.size(); TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); boolean allCached = true; @@ -992,17 +992,17 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } else { // we don't consider TTL LastQueryUtil.appendLastValue( - builder, - timeValuePair.getTimestamp(), - new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET), - timeValuePair.getValue().getStringValue(), - timeValuePair.getValue().getDataType().name()); + builder, + timeValuePair.getTimestamp(), + new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET), + timeValuePair.getValue().getStringValue(), + timeValuePair.getValue().getDataType().name()); } } // cache hit if (allCached) { TSExecuteStatementResp resp = - createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); + createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "")); if (builder.isEmpty()) { resp.setQueryResult(Collections.emptyList()); @@ -1025,23 +1025,23 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("Last Data Query: %s", req), s); } // create and cache dataset ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -1056,11 +1056,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp = createResponse(queryExecution.getDatasetHeader(), queryId); TSStatus tsstatus = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); tsstatus.setRedirectNode( - regionReplicaSets - .get(regionReplicaSets.size() - 1) - .dataNodeLocations - .get(0) - .clientRpcEndPoint); + regionReplicaSets + .get(regionReplicaSets.size() - 1) + .dataNodeLocations + .get(0) + .clientRpcEndPoint); resp.setStatus(tsstatus); finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize); resp.setMoreData(!finished); @@ -1075,7 +1075,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( - onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } catch (Error error) { finished = true; t = error; @@ -1087,13 +1087,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); + OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -1109,7 +1109,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { paths.add(req.deviceId + "." + sensor); } TSLastDataQueryReq tsLastDataQueryReq = - new TSLastDataQueryReq(req.sessionId, paths, Long.MIN_VALUE, req.statementId); + new TSLastDataQueryReq(req.sessionId, paths, Long.MIN_VALUE, req.statementId); tsLastDataQueryReq.setFetchSize(req.fetchSize); tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery); tsLastDataQueryReq.setLegalPathNodes(req.legalPathNodes); @@ -1118,7 +1118,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private static void sampleForCacheHitFastLastDataQueryForOneDevice( - TSFastLastDataQueryForOneDeviceReq req) { + TSFastLastDataQueryForOneDeviceReq req) { // only sample successful query if (COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled String queryRequest = getContentOfTSFastLastDataQueryForOneDeviceReq(req); @@ -1167,7 +1167,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<List<ByteBuffer>, Boolean> pair = - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); + QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); List<ByteBuffer> result = pair.left; finished = pair.right; boolean hasResultSet = !result.isEmpty(); @@ -1181,7 +1181,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSFetchResultsResp( - onQueryException(e, getContentOfRequest(req, queryExecution))); + onQueryException(e, getContentOfRequest(req, queryExecution))); } catch (Error error) { finished = true; t = error; @@ -1193,13 +1193,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.FETCH_RESULTS, statementType, currentOperationCost); + OperationType.FETCH_RESULTS, statementType, currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } @@ -1211,13 +1211,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req); BasicOpenSessionResp openSessionResp = - SESSION_MANAGER.login( - SESSION_MANAGER.getCurrSession(), - req.username, - req.password, - req.zoneId, - req.client_protocol, - clientVersion); + SESSION_MANAGER.login( + SESSION_MANAGER.getCurrSession(), + req.username, + req.password, + req.zoneId, + req.client_protocol, + clientVersion); TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage()); TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); return resp.setSessionId(openSessionResp.getSessionId()); @@ -1234,10 +1234,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus closeSession(TSCloseSessionReq req) { return new TSStatus( - !SESSION_MANAGER.closeSession( - SESSION_MANAGER.getCurrSession(), COORDINATOR::cleanupQueryExecution) - ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN) - : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + !SESSION_MANAGER.closeSession( + SESSION_MANAGER.getCurrSession(), COORDINATOR::cleanupQueryExecution) + ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN) + : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } @Override @@ -1249,12 +1249,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus closeOperation(TSCloseOperationReq req) { return SESSION_MANAGER.closeOperation( - SESSION_MANAGER.getCurrSession(), - req.queryId, - req.statementId, - req.isSetStatementId(), - req.isSetQueryId(), - COORDINATOR::cleanupQueryExecution); + SESSION_MANAGER.getCurrSession(), + req.queryId, + req.statementId, + req.isSetStatementId(), + req.isSetQueryId(), + COORDINATOR::cleanupQueryExecution); } @Override @@ -1262,13 +1262,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try { ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId(); return new TSGetTimeZoneResp( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), - zoneId != null ? zoneId.toString() : "Unknown time zone"); + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + zoneId != null ? zoneId.toString() : "Unknown time zone"); } catch (Exception e) { return new TSGetTimeZoneResp( - onNpeOrUnexpectedException( - e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR), - "Unknown time zone"); + onNpeOrUnexpectedException( + e, OperationType.GET_TIME_ZONE, TSStatusCode.GENERATE_TIME_ZONE_ERROR), + "Unknown time zone"); } } @@ -1279,7 +1279,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR); + e, OperationType.SET_TIME_ZONE, TSStatusCode.SET_TIME_ZONE_ERROR); } } @@ -1294,12 +1294,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MAX_TIME); properties.getSupportedTimeAggregationOperations().add(IoTDBConstant.MIN_TIME); properties.setTimestampPrecision( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); properties.setMaxConcurrentClientNum( - IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum()); + IoTDBDescriptor.getInstance().getConfig().getRpcMaxConcurrentClientNum()); properties.setIsReadOnly(CommonDescriptor.getInstance().getConfig().isReadOnly()); properties.setThriftMaxFrameSize( - IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()); + IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()); return properties; } @@ -1325,20 +1325,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.SET_STORAGE_GROUP, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.SET_STORAGE_GROUP, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.SET_STORAGE_GROUP, TSStatusCode.EXECUTE_STATEMENT_ERROR); } } @@ -1366,20 +1366,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1395,7 +1395,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementAlias( - PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAlias())); + PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAlias())); req.setMeasurements(PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurements())); @@ -1403,9 +1403,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { CreateAlignedTimeSeriesStatement statement = StatementGenerator.createStatement(req); if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "create aligned timeseries %s.%s", req.getPrefixPath(), req.getMeasurements()), - statement); + String.format( + "create aligned timeseries %s.%s", req.getPrefixPath(), req.getMeasurements()), + statement); } // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1416,20 +1416,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_ALIGNED_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_ALIGNED_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1445,16 +1445,16 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementAliasList( - PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAliasList())); + PathUtils.checkIsLegalSingleMeasurementsAndUpdate(req.getMeasurementAliasList())); // Step 1: transfer from CreateMultiTimeSeriesReq to Statement CreateMultiTimeSeriesStatement statement = StatementGenerator.createStatement(req); if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "create %s timeseries, the first is %s", - req.getPaths().size(), req.getPaths().get(0)), - statement); + String.format( + "create %s timeseries, the first is %s", + req.getPaths().size(), req.getPaths().get(0)), + statement); } // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1465,20 +1465,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_MULTI_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_MULTI_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_MULTI_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1494,7 +1494,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 1: transfer from DeleteStorageGroupsReq to Statement DeleteTimeSeriesStatement statement = - StatementGenerator.createDeleteTimeSeriesStatement(path); + StatementGenerator.createDeleteTimeSeriesStatement(path); // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1505,20 +1505,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.DELETE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.DELETE_TIMESERIES, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1548,20 +1548,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.DELETE_STORAGE_GROUPS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.DELETE_STORAGE_GROUPS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -1570,7 +1570,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) { return new TSFetchMetadataResp( - RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION, "Fetch Metadata is not supported.")); + RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION, "Fetch Metadata is not supported.")); } @Override @@ -1598,7 +1598,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); if (s == null) { return RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"); + TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"); } // permission check TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); @@ -1607,8 +1607,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); if (ENABLE_AUDIT_LOG) { AuditLogger.log(statement, s); @@ -1622,38 +1622,38 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { ExecutionResult result; if (s.shouldSplit()) { result = - executeBatchStatement( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); } else { result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); } results.add(result.status); } catch (Exception e) { LOGGER.warn("Error occurred when executing executeBatchStatement: ", e); TSStatus status = - onQueryException( - e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT); + onQueryException( + e, "\"" + statement + "\". " + OperationType.EXECUTE_BATCH_STATEMENT); isAllSuccessful = false; results.add(status); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_STATEMENT, type, System.nanoTime() - t2); + OperationType.EXECUTE_STATEMENT, type, System.nanoTime() - t2); if (quota != null) { quota.close(); } @@ -1661,12 +1661,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } finally { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), System.nanoTime() - t1); + OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); } return isAllSuccessful - ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully") - : RpcUtils.getStatus(results); + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully") + : RpcUtils.getStatus(results); } @Override @@ -1705,7 +1705,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<TSQueryDataSet, Boolean> pair = - convertTsBlockByFetchSize(queryExecution, req.fetchSize); + convertTsBlockByFetchSize(queryExecution, req.fetchSize); TSQueryDataSet result = pair.left; finished = pair.right; boolean hasResultSet = result.bufferForTime().limit() != 0; @@ -1719,7 +1719,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { finished = true; t = e; return RpcUtils.getTSFetchResultsResp( - onQueryException(e, getContentOfRequest(req, queryExecution))); + onQueryException(e, getContentOfRequest(req, queryExecution))); } catch (Error error) { finished = true; t = error; @@ -1731,13 +1731,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost CommonUtils.addStatementExecutionLatency( - OperationType.FETCH_RESULTS, statementType, currentOperationCost); + OperationType.FETCH_RESULTS, statementType, currentOperationCost); if (finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); CommonUtils.addQueryLatency( - StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } @@ -1757,7 +1757,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); @@ -1768,11 +1768,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPaths.get(0), req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPaths.get(0), req.getTimestamps().get(0)), + statement, + true); } // permission check @@ -1782,31 +1782,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_RECORDS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_RECORDS, - StatementType.BATCH_INSERT_ROWS.name(), - System.nanoTime() - t1); + OperationType.INSERT_RECORDS, + StatementType.BATCH_INSERT_ROWS.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1826,7 +1826,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertRecordsOfOneDeviceReq to Statement InsertRowsOfOneDeviceStatement statement = StatementGenerator.createStatement(req); @@ -1837,11 +1837,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPath, req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPath, req.getTimestamps().get(0)), + statement, + true); } // permission check @@ -1851,31 +1851,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_RECORDS_OF_ONE_DEVICE, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_RECORDS_OF_ONE_DEVICE, - StatementType.BATCH_INSERT_ONE_DEVICE.name(), - System.nanoTime() - t1); + OperationType.INSERT_RECORDS_OF_ONE_DEVICE, + StatementType.BATCH_INSERT_ONE_DEVICE.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1895,7 +1895,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertStringRecordsOfOneDeviceReq to Statement InsertRowsOfOneDeviceStatement statement = StatementGenerator.createStatement(req); @@ -1906,11 +1906,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPath, req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPath, req.getTimestamps().get(0)), + statement, + true); } // permission check TSStatus status = AuthorityChecker.checkAuthority(statement, clientSession); @@ -1919,34 +1919,34 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException( - e, OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, e.getErrorCode()); + e, OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, - OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, - TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, + OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, + TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, - StatementType.BATCH_INSERT_ONE_DEVICE.name(), - System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, + StatementType.BATCH_INSERT_ONE_DEVICE.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1975,10 +1975,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), - statement, - true); + String.format( + "insertRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), + statement, + true); } // permission check @@ -1988,29 +1988,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); + OperationType.INSERT_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2029,7 +2029,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); // Step 1: transfer from TSInsertTabletsReq to Statement InsertMultiTabletsStatement statement = StatementGenerator.createStatement(req); @@ -2045,31 +2045,31 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_TABLETS, - StatementType.MULTI_BATCH_INSERT.name(), - System.nanoTime() - t1); + OperationType.INSERT_TABLETS, + StatementType.MULTI_BATCH_INSERT.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2103,29 +2103,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(), System.nanoTime() - t1); + OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2145,7 +2145,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // check whether measurement is legal according to syntax convention req.setMeasurementsList( - PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); + PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList())); InsertRowsStatement statement = StatementGenerator.createStatement(req); // return success when this statement is empty because server doesn't need to execute it @@ -2155,11 +2155,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertRecords, first device %s, first time %s", - req.prefixPaths.get(0), req.getTimestamps().get(0)), - statement, - true); + String.format( + "insertRecords, first device %s, first time %s", + req.prefixPaths.get(0), req.getTimestamps().get(0)), + statement, + true); } // permission check @@ -2169,30 +2169,30 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_STRING_RECORDS, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_STRING_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_STRING_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORDS, - StatementType.BATCH_INSERT_ROWS.name(), - System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORDS, + StatementType.BATCH_INSERT_ROWS.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2260,20 +2260,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_DATA, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.DELETE_DATA, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.DELETE_DATA, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2324,20 +2324,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_SCHEMA_TEMPLATE, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.CREATE_SCHEMA_TEMPLATE, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.CREATE_SCHEMA_TEMPLATE, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2346,13 +2346,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) { return RpcUtils.getStatus( - TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); + TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); } @Override public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) { return RpcUtils.getStatus( - TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); + TSStatusCode.UNSUPPORTED_OPERATION, "Modify template has not been supported."); } @Override @@ -2373,9 +2373,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Statement statement = StatementGenerator.createStatement(req); if (statement == null) { resp.setStatus( - RpcUtils.getStatus( - TSStatusCode.UNSUPPORTED_OPERATION, - TemplateQueryType.values()[req.getQueryType()].name() + "has not been supported.")); + RpcUtils.getStatus( + TSStatusCode.UNSUPPORTED_OPERATION, + TemplateQueryType.values()[req.getQueryType()].name() + "has not been supported.")); return resp; } switch (TemplateQueryType.values()[req.getQueryType()]) { @@ -2397,8 +2397,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return executeTemplateQueryStatement(statement, req, resp); } catch (Exception e) { resp.setStatus( - onNpeOrUnexpectedException( - e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); + onNpeOrUnexpectedException( + e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); return resp; } finally { SESSION_MANAGER.updateIdleTime(); @@ -2419,8 +2419,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } catch (Exception e) { resp.setStatus( - onNpeOrUnexpectedException( - e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); + onNpeOrUnexpectedException( + e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); return resp; } finally { SESSION_MANAGER.updateIdleTime(); @@ -2444,22 +2444,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (IoTDBDescriptor.getInstance().getConfig().getDataNodeId() == nodeId) { resp.setContent( - ConfigurationFileUtils.readConfigFileContent( - IoTDBDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME))); + ConfigurationFileUtils.readConfigFileContent( + IoTDBDescriptor.getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME))); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); return resp; } try (ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { return client.showConfiguration(nodeId); } catch (ClientManagerException e) { throw new TException(e); } } catch (Exception e) { resp.setStatus( - onNpeOrUnexpectedException( - e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); + onNpeOrUnexpectedException( + e, OperationType.EXECUTE_QUERY_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR)); return resp; } finally { SESSION_MANAGER.updateIdleTime(); @@ -2467,7 +2467,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } private TSQueryTemplateResp executeTemplateQueryStatement( - Statement statement, TSQueryTemplateReq req, TSQueryTemplateResp resp) { + Statement statement, TSQueryTemplateReq req, TSQueryTemplateResp resp) { long startTime = System.nanoTime(); OperationQuota quota = null; try { @@ -2480,8 +2480,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); if (ENABLE_AUDIT_LOG) { AuditLogger.log(String.format("execute Query: %s", statement), statement); @@ -2489,18 +2489,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult executionResult = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - null, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - true); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + null, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + true); if (executionResult.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && executionResult.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + && executionResult.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { resp.setStatus(executionResult.status); return resp; } @@ -2530,13 +2530,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } catch (Exception e) { resp.setStatus( - onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); + onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); return null; } finally { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_STATEMENT, - statement.getType().name(), - System.nanoTime() - startTime); + OperationType.EXECUTE_STATEMENT, + statement.getType().name(), + System.nanoTime() - startTime); SESSION_MANAGER.updateIdleTime(); } } @@ -2556,8 +2556,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format("set device template %s.%s", req.getTemplateName(), req.getPrefixPath()), - statement); + String.format("set device template %s.%s", req.getTemplateName(), req.getPrefixPath()), + statement); } // permission check @@ -2569,20 +2569,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2603,9 +2603,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "unset device template %s from %s", req.getTemplateName(), req.getPrefixPath()), - statement); + String.format( + "unset device template %s from %s", req.getTemplateName(), req.getPrefixPath()), + statement); } // permission check @@ -2617,20 +2617,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2662,18 +2662,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2681,7 +2681,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus createTimeseriesUsingSchemaTemplate(TCreateTimeseriesUsingSchemaTemplateReq req) - throws TException { + throws TException { try { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); if (!SESSION_MANAGER.checkLogin(clientSession)) { @@ -2690,11 +2690,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 1: transfer to Statement BatchActivateTemplateStatement statement = - StatementGenerator.createBatchActivateTemplateStatement(req.getDevicePathList()); + StatementGenerator.createBatchActivateTemplateStatement(req.getDevicePathList()); if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format("batch activate device template %s", req.getDevicePathList()), statement); + String.format("batch activate device template %s", req.getDevicePathList()), statement); } // permission check @@ -2706,20 +2706,20 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.EXECUTE_STATEMENT, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { SESSION_MANAGER.updateIdleTime(); } @@ -2728,12 +2728,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus handshake(final TSyncIdentityInfo info) throws TException { return PipeDataNodeAgent.receiver() - .legacy() - .handshake( - info, - SESSION_MANAGER.getCurrSession().getClientAddress(), - partitionFetcher, - schemaFetcher); + .legacy() + .handshake( + info, + SESSION_MANAGER.getCurrSession().getClientAddress(), + partitionFetcher, + schemaFetcher); } @Override @@ -2743,7 +2743,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final ByteBuffer buff) - throws TException { + throws TException { return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff); } @@ -2789,10 +2789,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (ENABLE_AUDIT_LOG) { AuditLogger.log( - String.format( - "insertStringRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), - statement, - true); + String.format( + "insertStringRecord, device %s, time %s", req.getPrefixPath(), req.getTimestamp()), + statement, + true); } // Permission check @@ -2802,29 +2802,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), statement); // Step 2: Call the coordinator final long queryId = SESSION_MANAGER.requestQueryId(); final ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); + COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode()); } catch (Exception e) { return onNpeOrUnexpectedException( - e, OperationType.INSERT_STRING_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); + e, OperationType.INSERT_STRING_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { CommonUtils.addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2847,8 +2847,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { protected TSStatus getNotLoggedInStatus() { return RpcUtils.getStatus( - TSStatusCode.NOT_LOGIN, - "Log in failed. Either you are not authorized or the session has timed out."); + TSStatusCode.NOT_LOGIN, + "Log in failed. Either you are not authorized or the session has timed out."); } private String checkIdentifierAndRemoveBackQuotesIfNecessary(String identifier) { @@ -2880,74 +2880,74 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { * @return the execution result */ private ExecutionResult executeBatchStatement( - final Statement statement, - final long queryId, - final SessionInfo sessionInfo, - final String statementStr, - final IPartitionFetcher partitionFetcher, - final ISchemaFetcher schemaFetcher, - final long timeoutMs, - final boolean userQuery) { + final Statement statement, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final IPartitionFetcher partitionFetcher, + final ISchemaFetcher schemaFetcher, + final long timeoutMs, + final boolean userQuery) { ExecutionResult result = null; final List<? extends Statement> subStatements = statement.getSubStatements(); final int totalSubStatements = subStatements.size(); LOGGER.info( - "Start batch executing {} sub-statement(s) in tree model, queryId: {}", - totalSubStatements, - queryId); + "Start batch executing {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); for (int i = 0; i < totalSubStatements; i++) { final Statement subStatement = subStatements.get(i); LOGGER.info( - "Executing sub-statement {}/{} in tree model, queryId: {}", - i + 1, - totalSubStatements, - queryId); + "Executing sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); result = - COORDINATOR.executeForTreeModel( - subStatement, - queryId, - sessionInfo, - statementStr, - partitionFetcher, - schemaFetcher, - timeoutMs, - userQuery); + COORDINATOR.executeForTreeModel( + subStatement, + queryId, + sessionInfo, + statementStr, + partitionFetcher, + schemaFetcher, + timeoutMs, + userQuery); // Exit early if any sub-statement execution fails if (result != null - && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { final int completed = i + 1; final int remaining = totalSubStatements - completed; final double percentage = (completed * 100.0) / totalSubStatements; LOGGER.warn( - "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", - i + 1, - totalSubStatements, - queryId, - completed, - remaining, - String.format("%.2f", percentage), - result.status.getMessage()); + "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", + i + 1, + totalSubStatements, + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); break; } LOGGER.info( - "Successfully executed sub-statement {}/{} in tree model, queryId: {}", - i + 1, - totalSubStatements, - queryId); + "Successfully executed sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); } if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( - "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", - totalSubStatements, - queryId); + "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); } return result; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java index f085900a153..7248fe26046 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java @@ -317,7 +317,9 @@ public interface ISchemaRegion { throws MetadataException; int fillLastQueryMap( - final PartialPath pattern, final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, final PathPatternTree scope) + final PartialPath pattern, + final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, + final PathPatternTree scope) throws MetadataException; // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index a964b7de4af..2f75914e51f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -1335,7 +1335,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { @Override public int fillLastQueryMap( - final PartialPath pattern, final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, final PathPatternTree scope) + final PartialPath pattern, + final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, + final PathPatternTree scope) throws MetadataException { return mTree.fillLastQueryMap(pattern, mapToFill, scope); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index d4e773dd593..73bbce272e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -1440,7 +1440,9 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { @Override public int fillLastQueryMap( - final PartialPath pattern, final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, final PathPatternTree scope) { + final PartialPath pattern, + final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, + final PathPatternTree scope) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index e55b79f4645..b6f1221f701 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -1085,7 +1085,9 @@ public class MTreeBelowSGMemoryImpl { } public int fillLastQueryMap( - final PartialPath prefixPath, final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, final PathPatternTree scope) + final PartialPath prefixPath, + final Map<PartialPath, Map<String, TimeValuePair>> mapToFill, + final PathPatternTree scope) throws MetadataException { final int[] sensorNum = {0}; try (final EntityUpdater<IMemMNode> updater =
