This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c233196c8c7 [To dev/1.3] Implemented fast last query for on prefix
path (#15810)
c233196c8c7 is described below
commit c233196c8c7577e4865516cd2e1bf536b8b4236e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 29 18:13:33 2025 +0800
[To dev/1.3] Implemented fast last query for on prefix path (#15810)
* Fast last query on local
* Remove debug settings
* Remove debug settings 2
* [TIMECHODB]rest service add FastLastQuery method
* [TIMECHODB]Fixed multiple device return value format issues with REST
service
* Fix
* [TIMECHODB]Fixed the format issue of return values for the fast and query
interfaces of the rest service
* Remove binary
* wildcard detection
* Update RestApiServiceImpl.java
* filter
* fix
* fix
* fix-2
* licene
---------
Co-authored-by: luke.miao <[email protected]>
---
.../java/org/apache/iotdb/isession/ISession.java | 4 +
.../apache/iotdb/isession/pool/ISessionPool.java | 4 +
.../java/org/apache/iotdb/session/Session.java | 6 +
.../apache/iotdb/session/SessionConnection.java | 38 +++++
.../org/apache/iotdb/session/pool/SessionPool.java | 27 ++++
.../protocol/rest/v2/handler/FastLastHandler.java | 73 +++++++++
.../rest/v2/handler/RequestValidationHandler.java | 8 +
.../protocol/rest/v2/impl/RestApiServiceImpl.java | 171 ++++++++++++++++++---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 91 +++++++++++
.../analyze/cache/schema/DataNodeSchemaCache.java | 5 +
.../analyze/cache/schema/SchemaCacheEntry.java | 1 -
.../cache/schema/TimeSeriesSchemaCache.java | 5 +
.../cache/schema/dualkeycache/IDualKeyCache.java | 4 +
.../schema/dualkeycache/impl/DualKeyCacheImpl.java | 20 ++-
.../schemaengine/schemaregion/ISchemaRegion.java | 5 +
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 8 +
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 7 +
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 26 ++++
.../openapi/src/main/openapi3/iotdb_rest_v2.yaml | 25 +++
.../thrift-datanode/src/main/thrift/client.thrift | 12 ++
20 files changed, 514 insertions(+), 26 deletions(-)
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 85a8c9c00d5..d53963fb149 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -176,6 +177,9 @@ public interface ISession extends AutoCloseable {
SessionDataSet executeLastDataQuery(List<String> paths)
throws StatementExecutionException, IoTDBConnectionException;
+ SessionDataSet executeFastLastDataQueryForOnePrefixPath(final List<String>
prefixes)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException;
+
SessionDataSet executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
throws StatementExecutionException, IoTDBConnectionException;
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index eeb6509e89d..1193435889d 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -486,6 +487,9 @@ public interface ISessionPool {
String db, String device, List<String> sensors, boolean isLegalPathNodes)
throws StatementExecutionException, IoTDBConnectionException;
+ SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException;
+
SessionDataSetWrapper executeAggregationQuery(
List<String> paths, List<TAggregationType> aggregations)
throws StatementExecutionException, IoTDBConnectionException;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index e61d86c2f20..ff7c927871d 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1046,6 +1046,12 @@ public class Session implements ISession {
return executeLastDataQuery(paths, time, queryTimeoutInMs);
}
+ @Override
+ public SessionDataSet executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ return
defaultSessionConnection.executeLastDataQueryForOnePrefixPath(prefixes);
+ }
+
@Override
public SessionDataSet executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 425ae908e43..bee82eb2f14 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import
org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -442,6 +443,43 @@ public class SessionConnection {
zoneId);
}
+ protected SessionDataSet executeLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws StatementExecutionException, IoTDBConnectionException,
RedirectException {
+ TSFastLastDataQueryForOnePrefixPathReq req =
+ new TSFastLastDataQueryForOnePrefixPathReq(sessionId, prefixes,
statementId);
+ req.setFetchSize(session.fetchSize);
+ req.setEnableRedirectQuery(enableRedirect);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ req.setSessionId(sessionId);
+ req.setStatementId(statementId);
+ return client.executeFastLastDataQueryForOnePrefixPath(req);
+ });
+ final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+
+ if (result.getRetryAttempts() == 0) {
+
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
+ }
+
+ return new SessionDataSet(
+ "",
+ tsExecuteStatementResp.getColumns(),
+ tsExecuteStatementResp.getDataTypeList(),
+ tsExecuteStatementResp.columnNameIndexMap,
+ tsExecuteStatementResp.getQueryId(),
+ statementId,
+ client,
+ sessionId,
+ tsExecuteStatementResp.queryResult,
+ tsExecuteStatementResp.isIgnoreTimeStamp(),
+ tsExecuteStatementResp.moreData,
+ zoneId);
+ }
+
protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean
isLegalPathNodes, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 301b245b70e..2f69145eb28 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -3167,6 +3167,33 @@ public class SessionPool implements ISessionPool {
return null;
}
+ @Override
+ public SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ ISession session = getSession();
+ try {
+ SessionDataSet resp =
session.executeFastLastDataQueryForOnePrefixPath(prefixes);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp,
session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("executeLastDataQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(EXECUTE_LASTDATAQUERY_ERROR, e);
+ putBack(session);
+ throw new RuntimeException(e);
+ }
+ }
+ // never go here
+ return null;
+ }
+
@Override
public SessionDataSetWrapper executeLastDataQueryForOneDevice(
String db, String device, List<String> sensors, boolean isLegalPathNodes)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/FastLastHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/FastLastHandler.java
new file mode 100644
index 00000000000..5d1c3d0109c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/FastLastHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.protocol.rest.v2.handler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.protocol.rest.v2.model.ExecutionStatus;
+import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class FastLastHandler {
+
+ public static TSLastDataQueryReq createTSLastDataQueryReq(
+ IClientSession clientSession, PrefixPathList prefixPathList) {
+ TSLastDataQueryReq req = new TSLastDataQueryReq();
+ req.sessionId = clientSession.getId();
+ req.paths =
+ Collections.singletonList(String.join(".",
prefixPathList.getPrefixPaths()) + ".**");
+ req.time = Long.MIN_VALUE;
+ req.setLegalPathNodes(true);
+ return req;
+ }
+
+ public static Response buildErrorResponse(TSStatusCode statusCode) {
+ return Response.ok()
+ .entity(
+ new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ .code(statusCode.getStatusCode())
+ .message(statusCode.name()))
+ .build();
+ }
+
+ public static Response buildExecutionStatusResponse(TSStatus status) {
+ return Response.ok()
+ .entity(new
ExecutionStatus().code(status.getCode()).message(status.getMessage()))
+ .build();
+ }
+
+ public static void setupTargetDataSet(
+ org.apache.iotdb.db.protocol.rest.v2.model.QueryDataSet dataSet) {
+ dataSet.addExpressionsItem("Timeseries");
+ dataSet.addExpressionsItem("Value");
+ dataSet.addExpressionsItem("DataType");
+ dataSet.addDataTypesItem("TEXT");
+ dataSet.addDataTypesItem("TEXT");
+ dataSet.addDataTypesItem("TEXT");
+ dataSet.setValues(new ArrayList<>());
+ dataSet.setTimestamps(new ArrayList<>());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/RequestValidationHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/RequestValidationHandler.java
index 9a620da64ed..c1fc9c6327f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/RequestValidationHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/RequestValidationHandler.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.protocol.rest.v2.handler;
import org.apache.iotdb.db.protocol.rest.v2.model.ExpressionRequest;
import org.apache.iotdb.db.protocol.rest.v2.model.InsertRecordsRequest;
import org.apache.iotdb.db.protocol.rest.v2.model.InsertTabletRequest;
+import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList;
import org.apache.iotdb.db.protocol.rest.v2.model.SQL;
import org.apache.commons.lang3.Validate;
@@ -40,6 +41,13 @@ public class RequestValidationHandler {
}
}
+ public static void validatePrefixPaths(PrefixPathList prefixPathList) {
+ Objects.requireNonNull(prefixPathList.getPrefixPaths(), "prefix_paths
should not be null");
+ if (prefixPathList.getPrefixPaths().isEmpty()) {
+ throw new IllegalArgumentException("prefix_paths should not be empty");
+ }
+ }
+
public static void validateInsertTabletRequest(InsertTabletRequest
insertTabletRequest) {
Objects.requireNonNull(insertTabletRequest.getTimestamps(), "timestamps
should not be null");
Objects.requireNonNull(insertTabletRequest.getIsAligned(), "is_aligned
should not be null");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
index 6028d9d257e..ffaecaea7b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
@@ -18,26 +18,34 @@
package org.apache.iotdb.db.protocol.rest.v2.impl;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.protocol.rest.handler.AuthorizationHandler;
+import org.apache.iotdb.db.protocol.rest.model.ExecutionStatus;
import org.apache.iotdb.db.protocol.rest.utils.InsertTabletSortDataUtils;
+import org.apache.iotdb.db.protocol.rest.v2.NotFoundException;
import org.apache.iotdb.db.protocol.rest.v2.RestApiService;
import org.apache.iotdb.db.protocol.rest.v2.handler.ExceptionHandler;
import org.apache.iotdb.db.protocol.rest.v2.handler.ExecuteStatementHandler;
+import org.apache.iotdb.db.protocol.rest.v2.handler.FastLastHandler;
import org.apache.iotdb.db.protocol.rest.v2.handler.QueryDataSetHandler;
import org.apache.iotdb.db.protocol.rest.v2.handler.RequestValidationHandler;
import
org.apache.iotdb.db.protocol.rest.v2.handler.StatementConstructionHandler;
-import org.apache.iotdb.db.protocol.rest.v2.model.ExecutionStatus;
import org.apache.iotdb.db.protocol.rest.v2.model.InsertRecordsRequest;
import org.apache.iotdb.db.protocol.rest.v2.model.InsertTabletRequest;
+import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList;
+import org.apache.iotdb.db.protocol.rest.v2.model.QueryDataSet;
import org.apache.iotdb.db.protocol.rest.v2.model.SQL;
+import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
@@ -46,15 +54,24 @@ import
org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.read.TimeValuePair;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
public class RestApiServiceImpl extends RestApiService {
@@ -80,6 +97,117 @@ public class RestApiServiceImpl extends RestApiService {
IoTDBRestServiceDescriptor.getInstance().getConfig().getRestQueryDefaultRowSizeLimit();
}
+ @Override
+ public Response executeFastLastQueryStatement(
+ PrefixPathList prefixPathList, SecurityContext securityContext) {
+ Long queryId = null;
+ Statement statement = null;
+ boolean finish = false;
+ long startTime = System.nanoTime();
+
+ try {
+ RequestValidationHandler.validatePrefixPaths(prefixPathList);
+
+ PartialPath prefixPath =
+ new PartialPath(prefixPathList.getPrefixPaths().toArray(new
String[0]));
+ Map<PartialPath, Map<String, TimeValuePair>> resultMap = new HashMap<>();
+
+ final String prefixString = prefixPath.toString();
+ for (ISchemaRegion region :
SchemaEngine.getInstance().getAllSchemaRegions()) {
+ if (!prefixString.startsWith(region.getDatabaseFullPath())
+ && !region.getDatabaseFullPath().startsWith(prefixString)) {
+ continue;
+ }
+ region.fillLastQueryMap(prefixPath, resultMap);
+ }
+ // Check cache first
+ if (!DataNodeSchemaCache.getInstance().getLastCache(resultMap)) {
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ TSLastDataQueryReq tsLastDataQueryReq =
+ FastLastHandler.createTSLastDataQueryReq(clientSession,
prefixPathList);
+ statement = StatementGenerator.createStatement(tsLastDataQueryReq);
+
+ if (ExecuteStatementHandler.validateStatement(statement)) {
+ return
FastLastHandler.buildErrorResponse(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+
+
Optional.ofNullable(authorizationHandler.checkAuthority(securityContext,
statement))
+ .ifPresent(Response.class::cast);
+
+ queryId = SESSION_MANAGER.requestQueryId();
+ SessionInfo sessionInfo =
SESSION_MANAGER.getSessionInfo(clientSession);
+
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ statement,
+ queryId,
+ sessionInfo,
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ config.getQueryTimeoutThreshold(),
+ true);
+
+ finish = true;
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ return FastLastHandler.buildExecutionStatusResponse(result.status);
+ }
+
+ IQueryExecution queryExecution =
COORDINATOR.getQueryExecution(queryId);
+ try (SetThreadName ignored = new
SetThreadName(result.queryId.getId())) {
+ return QueryDataSetHandler.fillQueryDataSet(
+ queryExecution, statement, defaultQueryRowLimit);
+ }
+ }
+
+ // Cache hit: build response directly
+ QueryDataSet targetDataSet = new QueryDataSet();
+
+ FastLastHandler.setupTargetDataSet(targetDataSet);
+ List<Object> timeseries = new ArrayList<>();
+ List<Object> valueList = new ArrayList<>();
+ List<Object> dataTypeList = new ArrayList<>();
+ for (Map.Entry<PartialPath, Map<String, TimeValuePair>> entry :
resultMap.entrySet()) {
+ final String deviceWithPrefix = entry.getKey() +
TsFileConstant.PATH_SEPARATOR;
+ for (Map.Entry<String, TimeValuePair> measurementEntry :
entry.getValue().entrySet()) {
+ final TimeValuePair tvPair = measurementEntry.getValue();
+ valueList.add(tvPair.getValue().getStringValue());
+ dataTypeList.add(tvPair.getValue().getDataType().name());
+ targetDataSet.addTimestampsItem(tvPair.getTimestamp());
+ timeseries.add(deviceWithPrefix + measurementEntry.getKey());
+ }
+ }
+ if (!timeseries.isEmpty()) {
+ targetDataSet.addValuesItem(timeseries);
+ targetDataSet.addValuesItem(valueList);
+ targetDataSet.addValuesItem(dataTypeList);
+ }
+ return Response.ok().entity(targetDataSet).build();
+
+ } catch (Exception e) {
+ finish = true;
+ return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
+ } finally {
+ long costTime = System.nanoTime() - startTime;
+ Optional.ofNullable(statement)
+ .ifPresent(
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
s.getType().name(), costTime));
+ if (queryId != null) {
+ if (finish) {
+ long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+ CommonUtils.addQueryLatency(
+ statement != null ? statement.getType() : null,
+ executionTime > 0 ? executionTime : costTime);
+ }
+ COORDINATOR.cleanupQueryExecution(queryId);
+ }
+ }
+ }
+
@Override
public Response executeNonQueryStatement(SQL sql, SecurityContext
securityContext) {
Long queryId = null;
@@ -92,7 +220,7 @@ public class RestApiServiceImpl extends RestApiService {
if (statement == null) {
return Response.ok()
.entity(
- new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ new ExecutionStatus()
.code(TSStatusCode.SQL_PARSE_ERROR.getStatusCode())
.message("This operation type is not supported"))
.build();
@@ -100,7 +228,7 @@ public class RestApiServiceImpl extends RestApiService {
if (!ExecuteStatementHandler.validateStatement(statement)) {
return Response.ok()
.entity(
- new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ new ExecutionStatus()
.code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
.build();
@@ -128,12 +256,9 @@ public class RestApiServiceImpl extends RestApiService {
return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
} finally {
long costTime = System.nanoTime() - startTime;
- Optional.ofNullable(statement)
- .ifPresent(
- s -> {
- CommonUtils.addStatementExecutionLatency(
- OperationType.EXECUTE_NON_QUERY_PLAN, s.getType().name(),
costTime);
- });
+ if (statement != null)
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.EXECUTE_NON_QUERY_PLAN, statement.getType().name(),
costTime);
if (queryId != null) {
if (finish) {
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
@@ -158,7 +283,7 @@ public class RestApiServiceImpl extends RestApiService {
if (statement == null) {
return Response.ok()
.entity(
- new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ new ExecutionStatus()
.code(TSStatusCode.SQL_PARSE_ERROR.getStatusCode())
.message("This operation type is not supported"))
.build();
@@ -167,7 +292,7 @@ public class RestApiServiceImpl extends RestApiService {
if (ExecuteStatementHandler.validateStatement(statement)) {
return Response.ok()
.entity(
- new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
+ new ExecutionStatus()
.code(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.message(TSStatusCode.EXECUTE_STATEMENT_ERROR.name()))
.build();
@@ -214,10 +339,9 @@ public class RestApiServiceImpl extends RestApiService {
long costTime = System.nanoTime() - startTime;
Optional.ofNullable(statement)
.ifPresent(
- s -> {
- CommonUtils.addStatementExecutionLatency(
- OperationType.EXECUTE_QUERY_STATEMENT, s.getType().name(),
costTime);
- });
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.EXECUTE_QUERY_STATEMENT,
s.getType().name(), costTime));
if (queryId != null) {
if (finish) {
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
@@ -231,7 +355,8 @@ public class RestApiServiceImpl extends RestApiService {
@Override
public Response insertRecords(
- InsertRecordsRequest insertRecordsRequest, SecurityContext
securityContext) {
+ InsertRecordsRequest insertRecordsRequest, SecurityContext
securityContext)
+ throws NotFoundException {
Long queryId = null;
long startTime = System.nanoTime();
InsertRowsStatement insertRowsStatement = null;
@@ -264,10 +389,9 @@ public class RestApiServiceImpl extends RestApiService {
long costTime = System.nanoTime() - startTime;
Optional.ofNullable(insertRowsStatement)
.ifPresent(
- s -> {
- CommonUtils.addStatementExecutionLatency(
- OperationType.INSERT_RECORDS, s.getType().name(),
costTime);
- });
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.INSERT_RECORDS, s.getType().name(),
costTime));
if (queryId != null) {
COORDINATOR.cleanupQueryExecution(queryId);
}
@@ -319,10 +443,9 @@ public class RestApiServiceImpl extends RestApiService {
long costTime = System.nanoTime() - startTime;
Optional.ofNullable(insertTabletStatement)
.ifPresent(
- s -> {
- CommonUtils.addStatementExecutionLatency(
- OperationType.INSERT_TABLET, s.getType().name(), costTime);
- });
+ s ->
+ CommonUtils.addStatementExecutionLatency(
+ OperationType.INSERT_TABLET, s.getType().name(),
costTime));
if (queryId != null) {
COORDINATOR.cleanupQueryExecution(queryId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 3c710d0a82f..64f5c0209fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -101,6 +101,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.CreateSc
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DropSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.SetSchemaTemplateStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -137,6 +139,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import
org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
@@ -173,6 +176,7 @@ import org.apache.thrift.TException;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -793,6 +797,93 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return executeLastDataQueryInternal(req, SELECT_RESULT);
}
+ @Override
+ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath(
+ final TSFastLastDataQueryForOnePrefixPathReq req) {
+ final IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ try {
+ final long queryId = SESSION_MANAGER.requestQueryId(clientSession,
req.statementId);
+ // 1. Map<Device, String[] measurements>
ISchemaFetcher.getAllSensors(prefix) ~= 50ms
+
+ final PartialPath prefixPath = new
PartialPath(req.getPrefixes().toArray(new String[0]));
+ if (prefixPath.hasWildcard()) {
+ RpcUtils.getTSExecuteStatementResp(
+ new TSStatus(TSStatusCode.SEMANTIC_ERROR.getStatusCode())
+ .setMessage(
+ "The \"executeFastLastDataQueryForOnePrefixPath\" dos not
support wildcards."));
+ }
+
+ final Map<PartialPath, Map<String, TimeValuePair>> resultMap = new
HashMap<>();
+ int sensorNum = 0;
+
+ final String prefixString = prefixPath.toString();
+ for (final ISchemaRegion region :
SchemaEngine.getInstance().getAllSchemaRegions()) {
+ if (!prefixString.startsWith(region.getDatabaseFullPath())
+ && !region.getDatabaseFullPath().startsWith(prefixString)) {
+ continue;
+ }
+ sensorNum += region.fillLastQueryMap(prefixPath, resultMap);
+ }
+
+ // 2.DATA_NODE_SCHEMA_CACHE.getLastCache()
+ if (!DataNodeSchemaCache.getInstance().getLastCache(resultMap)) {
+ // 2.1 any sensor miss cache, construct last query sql, then return
+ return executeLastDataQueryInternal(convert(req), SELECT_RESULT);
+ }
+
+ // 2.2 all sensors hit cache, return response ~= 20ms
+ final TsBlockBuilder builder =
LastQueryUtil.createTsBlockBuilder(sensorNum);
+
+ for (final Map.Entry<PartialPath, Map<String, TimeValuePair>> result :
resultMap.entrySet()) {
+ final String deviceWithPrefix = result.getKey() +
TsFileConstant.PATH_SEPARATOR;
+ for (final Map.Entry<String, TimeValuePair> measurementLastEntry :
+ result.getValue().entrySet()) {
+ final TimeValuePair tvPair = measurementLastEntry.getValue();
+ LastQueryUtil.appendLastValue(
+ builder,
+ tvPair.getTimestamp(),
+ new Binary(
+ deviceWithPrefix + measurementLastEntry.getKey(),
TSFileConfig.STRING_CHARSET),
+ tvPair.getValue().getStringValue(),
+ tvPair.getValue().getDataType().name());
+ }
+ }
+
+ final TSExecuteStatementResp resp =
+ createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
+ if (builder.isEmpty()) {
+ resp.setQueryResult(Collections.emptyList());
+ } else {
+
resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build())));
+ }
+
+ resp.setMoreData(false);
+ return resp;
+ } catch (final Exception e) {
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " +
OperationType.EXECUTE_LAST_DATA_QUERY));
+ }
+ }
+
+ private TSLastDataQueryReq convert(final
TSFastLastDataQueryForOnePrefixPathReq req) {
+ TSLastDataQueryReq tsLastDataQueryReq =
+ new TSLastDataQueryReq(
+ req.sessionId,
+ Collections.singletonList(String.join(".", req.getPrefixes()) +
".**"),
+ Long.MIN_VALUE,
+ req.statementId);
+ tsLastDataQueryReq.setFetchSize(req.fetchSize);
+ tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery);
+ tsLastDataQueryReq.setLegalPathNodes(true);
+ tsLastDataQueryReq.setTimeout(req.timeout);
+ return tsLastDataQueryReq;
+ }
+
@Override
public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
TSFastLastDataQueryForOneDeviceReq req) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index b81307ab7ab..1c8ac8cb0a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
@@ -197,6 +198,10 @@ public class DataNodeSchemaCache {
return timeSeriesSchemaCache.getLastCache(seriesPath);
}
+ public boolean getLastCache(final Map<PartialPath, Map<String,
TimeValuePair>> inputMap) {
+ return timeSeriesSchemaCache.getLastCache(inputMap);
+ }
+
public void invalidateLastCache(PartialPath path) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
index b105156fdef..d97d5ac30f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java
@@ -36,7 +36,6 @@ public class SchemaCacheEntry implements
IMeasurementSchemaInfo {
private final String storageGroup;
private final IMeasurementSchema iMeasurementSchema;
-
private final Map<String, String> tagMap;
private final boolean isAligned;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
index fadd38c5ae4..51f22abd1e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
@@ -247,6 +248,10 @@ public class TimeSeriesSchemaCache {
return DataNodeLastCacheManager.getLastCache(entry);
}
+ public boolean getLastCache(final Map<PartialPath, Map<String,
TimeValuePair>> inputMap) {
+ return dualKeyCache.batchGet(inputMap,
DataNodeLastCacheManager::getLastCache);
+ }
+
/** get SchemaCacheEntry and update last cache */
@TestOnly
public void updateLastCache(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
index 95f0b0e8f00..397ca18e89f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
import javax.annotation.concurrent.GuardedBy;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
/**
* This interfaces defines the behaviour of a dual key cache. A dual key cache
supports manage cache
@@ -39,6 +41,8 @@ public interface IDualKeyCache<FK, SK, V> {
/** Get the cache value with given first key and second key. */
V get(FK firstKey, SK secondKey);
+ <R> boolean batchGet(final Map<FK, Map<SK, R>> inputMap, final Function<V,
R> mappingFunction);
+
/**
* Traverse target cache values via given first key and second keys provided
in computation and
* execute the defined computation logic. The computation is read only.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
index d6cc2dea13a..5a4024bb31d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java
@@ -79,6 +79,25 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK,
V>>
}
}
+ @Override
+ public <R> boolean batchGet(
+ final Map<FK, Map<SK, R>> inputMap, final Function<V, R>
mappingFunction) {
+ for (final Map.Entry<FK, Map<SK, R>> fkMapEntry : inputMap.entrySet()) {
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(fkMapEntry.getKey());
+ if (cacheEntryGroup == null) {
+ return false;
+ }
+ for (final Map.Entry<SK, R> skrEntry : fkMapEntry.getValue().entrySet())
{
+ final T cacheEntry = cacheEntryGroup.getCacheEntry(skrEntry.getKey());
+ if (cacheEntry == null) {
+ return false;
+ }
+ skrEntry.setValue(mappingFunction.apply(cacheEntry.getValue()));
+ }
+ }
+ return true;
+ }
+
@Override
public void compute(IDualKeyCacheComputation<FK, SK, V> computation) {
FK firstKey = computation.getFirstKey();
@@ -393,7 +412,6 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK,
V>>
private static class SegmentedConcurrentHashMap<K, V> {
private static final int SLOT_NUM = 31;
-
private final Map<K, V>[] maps = new ConcurrentHashMap[SLOT_NUM];
V get(K key) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
index 12536f0dbed..6b050438062 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IAlterLogica
import
org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.ICreateLogicalViewPlan;
import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import java.io.File;
@@ -312,6 +313,10 @@ public interface ISchemaRegion {
long countPathsUsingTemplate(int templateId, PathPatternTree patternTree)
throws MetadataException;
+ int fillLastQueryMap(
+ final PartialPath pattern, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill)
+ throws MetadataException;
+
// endregion
// region Interfaces for SchemaReader
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 30312feaa32..95feb33e52a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -93,6 +93,7 @@ import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1322,6 +1323,13 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
return result;
}
+ @Override
+ public int fillLastQueryMap(
+ final PartialPath pattern, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill)
+ throws MetadataException {
+ return mtree.fillLastQueryMap(pattern, mapToFill);
+ }
+
@Override
public ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan
showDevicesPlan)
throws MetadataException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 39ea98515e5..6ae88e49468 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -93,6 +93,7 @@ import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1427,6 +1428,12 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
return result;
}
+ @Override
+ public int fillLastQueryMap(
+ PartialPath pattern, Map<PartialPath, Map<String, TimeValuePair>>
mapToFill) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
public ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan
showDevicesPlan)
throws MetadataException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index 2710b754fe5..9f6316583d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -75,6 +75,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -1074,6 +1075,31 @@ public class MTreeBelowSGMemoryImpl {
}
}
+ public int fillLastQueryMap(
+ final PartialPath prefixPath, final Map<PartialPath, Map<String,
TimeValuePair>> mapToFill)
+ throws MetadataException {
+ final int[] sensorNum = {0};
+ try (final EntityUpdater<IMemMNode> updater =
+ new EntityUpdater<IMemMNode>(
+ rootNode, prefixPath, store, true, SchemaConstant.ALL_MATCH_SCOPE)
{
+
+ @Override
+ protected void updateEntity(final IDeviceMNode<IMemMNode> node) {
+ final Map<String, TimeValuePair> measurementMap = new HashMap<>();
+ for (final IMemMNode child : node.getChildren().values()) {
+ if (child instanceof IMeasurementMNode) {
+ measurementMap.put(child.getName(), null);
+ }
+ }
+ mapToFill.put(node.getPartialPath(), measurementMap);
+ sensorNum[0] += measurementMap.size();
+ }
+ }) {
+ updater.update();
+ }
+ return sensorNum[0];
+ }
+
public ISchemaReader<ITimeSeriesSchemaInfo> getTimeSeriesReader(
IShowTimeSeriesPlan showTimeSeriesPlan,
Function<Long, Pair<Map<String, String>, Map<String, String>>>
tagAndAttributeProvider)
diff --git a/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_v2.yaml
b/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_v2.yaml
index 04d12359da4..0cae51bef23 100644
--- a/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_v2.yaml
+++ b/iotdb-protocol/openapi/src/main/openapi3/iotdb_rest_v2.yaml
@@ -102,6 +102,23 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/QueryDataSet'
+ /rest/v2/fastLastQuery:
+ post:
+ summary: executeFastLastQueryStatement
+ description: executeFastLastQueryStatement
+ operationId: executeFastLastQueryStatement
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/PrefixPathList'
+ responses:
+ "200":
+ description: QueryDataSet
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/QueryDataSet'
/grafana/v2/login:
get:
@@ -185,6 +202,14 @@ components:
type: integer
format: int32
+ PrefixPathList:
+ title: PrefixPathList
+ type: object
+ properties:
+ prefix_paths:
+ type: array
+ items:
+ type: string
InsertTabletRequest:
title: InsertTabletRequest
type: object
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 0fa1a9a6ecb..1294d4cffbc 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -338,6 +338,16 @@ struct TSLastDataQueryReq {
9: optional bool legalPathNodes
}
+struct TSFastLastDataQueryForOnePrefixPathReq {
+ 1: required i64 sessionId
+ 2: required list<string> prefixes
+ 3: optional i32 fetchSize
+ 4: required i64 statementId
+ 5: optional bool enableRedirectQuery
+ 6: optional bool jdbcQuery
+ 7: optional i64 timeout
+}
+
struct TSFastLastDataQueryForOneDeviceReq {
1: required i64 sessionId
2: required string db
@@ -541,6 +551,8 @@ service IClientRPCService {
TSExecuteStatementResp executeLastDataQueryV2(1:TSLastDataQueryReq req);
+ TSExecuteStatementResp
executeFastLastDataQueryForOnePrefixPath(1:TSFastLastDataQueryForOnePrefixPathReq
req);
+
TSExecuteStatementResp
executeFastLastDataQueryForOneDeviceV2(1:TSFastLastDataQueryForOneDeviceReq
req);
TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq
req);