Young-Leo commented on code in PR #17027:
URL: https://github.com/apache/iotdb/pull/17027#discussion_r2703294240


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,179 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      // Parse SQL to get Statement AST
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      // Get parameter count before registering
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      // Register the prepared statement using helper
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      // Deserialize parameters and convert to Literal list
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      // Construct Execute AST node, reuse Coordinator's existing Execute 
handling logic
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      // Request query ID
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      // Execute using Coordinator (Coordinator internally handles Execute 
statement)
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId, null, t);
+      }
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+    }
+  }
+
+  @Override
+  public TSStatus deallocatePreparedStatement(TSDeallocatePreparedReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return getNotLoggedInStatus();
+    }
+
+    try {
+      // Unregister the prepared statement using helper
+      PreparedStatementHelper.unregister(clientSession, 
req.getStatementName());
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } catch (Exception e) {
+      return onQueryException(
+          e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to