Young-Leo commented on code in PR #17027:
URL: https://github.com/apache/iotdb/pull/17027#discussion_r2703294240
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,179 @@ public TSStatus closeOperation(TSCloseOperationReq req)
{
COORDINATOR::cleanupQueryExecution);
}
+ // ========================= PreparedStatement RPC Methods
=========================
+
+ @Override
+ public TSPrepareResp prepareStatement(TSPrepareReq req) {
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return new TSPrepareResp(getNotLoggedInStatus());
+ }
+
+ try {
+ String sql = req.getSql();
+ String statementName = req.getStatementName();
+
+ // Parse SQL to get Statement AST
+ org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
statement =
+ relationSqlParser.createStatement(sql, clientSession.getZoneId(),
clientSession);
+
+ if (statement == null) {
+ return new TSPrepareResp(
+ RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse
SQL: " + sql));
+ }
+
+ // Get parameter count before registering
+ int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+ // Register the prepared statement using helper
+ PreparedStatementHelper.register(clientSession, statementName,
statement);
+
+ TSPrepareResp resp = new
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ resp.setParameterCount(parameterCount);
+ return resp;
+ } catch (Exception e) {
+ return new TSPrepareResp(
+ onQueryException(
+ e, OperationType.EXECUTE_STATEMENT.getName(),
TSStatusCode.INTERNAL_SERVER_ERROR));
+ }
+ }
+
+ @Override
+ public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq
req) {
+ boolean finished = false;
+ long queryId = Long.MIN_VALUE;
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+ }
+
+ long startTime = System.nanoTime();
+ Throwable t = null;
+ try {
+ String statementName = req.getStatementName();
+
+ // Deserialize parameters and convert to Literal list
+ List<DeserializedParam> rawParams =
+
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+ List<Literal> parameters = new ArrayList<>(rawParams.size());
+ for (DeserializedParam param : rawParams) {
+ parameters.add(convertToLiteral(param));
+ }
+
+ // Construct Execute AST node, reuse Coordinator's existing Execute
handling logic
+ Execute executeStatement = new Execute(new Identifier(statementName),
parameters);
+
+ // Request query ID
+ queryId = SESSION_MANAGER.requestQueryId(clientSession,
req.getStatementId());
+
+ // Execute using Coordinator (Coordinator internally handles Execute
statement)
+ long timeout = req.isSetTimeout() ? req.getTimeout() :
config.getQueryTimeoutThreshold();
+ ExecutionResult result =
+ COORDINATOR.executeForTableModel(
+ executeStatement,
+ relationSqlParser,
+ clientSession,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ "EXECUTE " + statementName,
+ metadata,
+ timeout,
+ true);
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ finished = true;
+ return RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+
+ IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+ try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
+ TSExecuteStatementResp resp;
+ if (queryExecution != null && queryExecution.isQuery()) {
+ resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+ resp.setStatus(result.status);
+ int fetchSize =
+ req.isSetFetchSize() ? req.getFetchSize() :
config.getThriftMaxFrameSize();
+ finished = setResultForPrepared.apply(resp, queryExecution,
fetchSize);
+ resp.setMoreData(!finished);
+ } else {
+ finished = true;
+ resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(
+ e, OperationType.EXECUTE_STATEMENT.getName(),
TSStatusCode.INTERNAL_SERVER_ERROR));
+ } finally {
+ long currentOperationCost = System.nanoTime() - startTime;
+ if (finished) {
+ COORDINATOR.cleanupQueryExecution(queryId, null, t);
+ }
+ COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+ }
+ }
+
+ @Override
+ public TSStatus deallocatePreparedStatement(TSDeallocatePreparedReq req) {
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return getNotLoggedInStatus();
+ }
+
+ try {
+ // Unregister the prepared statement using helper
+ PreparedStatementHelper.unregister(clientSession,
req.getStatementName());
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ } catch (Exception e) {
+ return onQueryException(
+ e, OperationType.EXECUTE_STATEMENT.getName(),
TSStatusCode.INTERNAL_SERVER_ERROR);
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]