JackieTien97 commented on code in PR #17027:
URL: https://github.com/apache/iotdb/pull/17027#discussion_r2761818559
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -559,17 +549,178 @@ private TSExecuteStatementResp executeStatementInternal(
}
}
+ private void clearUp(
+ IClientSession clientSession,
+ Long statementId,
+ Long queryId,
+ Supplier<String> contentSupplier,
+ Throwable t) {
+ COORDINATOR.cleanupQueryExecution(queryId, contentSupplier, t);
+ // clear up queryId Map in clientSession
+ clientSession.removeQueryId(statementId, queryId);
+ }
+
private void clearUp(
IClientSession clientSession,
Long statementId,
Long queryId,
org.apache.thrift.TBase<?, ?> req,
Throwable t) {
- COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ // Create Supplier that uses CommonUtils.getContentOfRequest for TBase
requests
+ Supplier<String> contentSupplier =
+ () -> CommonUtils.getContentOfRequest(req,
COORDINATOR.getQueryExecution(queryId));
Review Comment:
Why not directlt using `ContentOfQuerySupplier` in `Coordinator`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -389,7 +382,7 @@ private TSExecuteStatementResp executeStatementInternal(
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
s);
statementType = s.getType();
- queryId = SESSION_MANAGER.requestQueryId(clientSession,
req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(clientSession,
request.getStatementId());
Review Comment:
use `statementId` instead of `request.getStatementId()`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -559,17 +549,178 @@ private TSExecuteStatementResp executeStatementInternal(
}
}
+ private void clearUp(
+ IClientSession clientSession,
+ Long statementId,
+ Long queryId,
+ Supplier<String> contentSupplier,
+ Throwable t) {
+ COORDINATOR.cleanupQueryExecution(queryId, contentSupplier, t);
+ // clear up queryId Map in clientSession
+ clientSession.removeQueryId(statementId, queryId);
+ }
+
private void clearUp(
IClientSession clientSession,
Long statementId,
Long queryId,
org.apache.thrift.TBase<?, ?> req,
Throwable t) {
- COORDINATOR.cleanupQueryExecution(queryId, req, t);
+ // Create Supplier that uses CommonUtils.getContentOfRequest for TBase
requests
+ Supplier<String> contentSupplier =
+ () -> CommonUtils.getContentOfRequest(req,
COORDINATOR.getQueryExecution(queryId));
+ COORDINATOR.cleanupQueryExecution(queryId, contentSupplier, t);
// clear up queryId Map in clientSession
clientSession.removeQueryId(statementId, queryId);
}
+ /** Adapter that wraps TSExecuteStatementReq to implement
NativeStatementRequest. */
+ private static class TSExecuteStatementReqAdapter implements
NativeStatementRequest {
+ private final TSExecuteStatementReq req;
+
+ TSExecuteStatementReqAdapter(TSExecuteStatementReq req) {
+ this.req = req;
+ }
+
+ @Override
+ public Long getStatementId() {
+ return req.getStatementId();
+ }
+
+ @Override
+ public long getTimeout() {
+ return req.getTimeout();
+ }
+
+ @Override
+ public int getFetchSize() {
+ return req.getFetchSize();
+ }
+
+ @Override
+ public Statement getTreeStatement(ZoneId zoneId) {
+ return StatementGenerator.createStatement(req.getStatement(), zoneId);
+ }
+
+ @Override
+ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
getTableStatement(
+ SqlParser parser, ZoneId zoneId, IClientSession clientSession) {
+ return parser.createStatement(req.getStatement(), zoneId, clientSession);
+ }
+
+ @Override
+ public String getSql() {
+ return req.getStatement();
+ }
+ }
+
+ /** Adapter that wraps TSExecutePreparedReq to implement
NativeStatementRequest. */
+ private static class TSExecutePreparedReqAdapter implements
NativeStatementRequest {
+ private final TSExecutePreparedReq req;
+
+ // Lazily computed fields
+ private String fullStatement;
+ private Execute executeStatement;
+
+ TSExecutePreparedReqAdapter(TSExecutePreparedReq req) {
+ this.req = req;
+ }
+
+ @Override
+ public Long getStatementId() {
+ return req.getStatementId();
+ }
+
+ @Override
+ public long getTimeout() {
+ return req.getTimeout();
+ }
+
+ @Override
+ public int getFetchSize() {
+ return req.getFetchSize();
+ }
+
+ @Override
+ public Statement getTreeStatement(ZoneId zoneId) {
+ // PreparedStatement is primarily for Table model, return null for Tree
model
+ return null;
Review Comment:
throw unsupportedexception.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -711,47 +709,40 @@ public QueryId createQueryId() {
return queryIdGenerator.createNextQueryId();
}
- public void cleanupQueryExecution(
- Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable
t) {
+ public void cleanupQueryExecution(Long queryId, Supplier<String>
contentSupplier, Throwable t) {
IQueryExecution queryExecution = getQueryExecution(queryId);
if (queryExecution != null) {
- try (SetThreadName threadName = new
SetThreadName(queryExecution.getQueryId())) {
- LOGGER.debug("[CleanUpQuery]]");
- queryExecution.stopAndCleanup(t);
- boolean isUserQuery = queryExecution.isQuery() &&
queryExecution.isUserQuery();
- Supplier<String> contentOfQuerySupplier =
- new ContentOfQuerySupplier(nativeApiRequest, queryExecution);
- if (isUserQuery) {
- recordCurrentQueries(
- queryExecution.getQueryId(),
- queryExecution.getStartExecutionTime(),
- System.currentTimeMillis(),
- queryExecution.getTotalExecutionTime(),
- contentOfQuerySupplier,
- queryExecution.getUser(),
- queryExecution.getClientHostname());
- }
- queryExecutionMap.remove(queryId);
- if (isUserQuery) {
- recordQueries(queryExecution::getTotalExecutionTime,
contentOfQuerySupplier, t);
- }
- }
+ Supplier<String> finalSupplier =
+ contentSupplier != null
+ ? contentSupplier
+ : () -> queryExecution.getExecuteSQL().orElse("UNKNOWN");
Review Comment:
why not using `ContentOfQuerySupplier`?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -991,7 +1142,11 @@ public TSExecuteStatementResp
executeUpdateStatementV2(TSExecuteStatementReq req
@Override
public TSExecuteStatementResp executeStatementV2(TSExecuteStatementReq req) {
- return executeStatementInternal(req, SELECT_RESULT);
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
Review Comment:
no need to do this again?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -437,7 +427,7 @@ private TSExecuteStatementResp executeStatementInternal(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not
supported"));
}
- queryId = SESSION_MANAGER.requestQueryId(clientSession,
req.statementId);
+ queryId = SESSION_MANAGER.requestQueryId(clientSession,
request.getStatementId());
Review Comment:
use `statementId` instead of `request.getStatementId()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]