JackieTien97 commented on code in PR #17027:
URL: https://github.com/apache/iotdb/pull/17027#discussion_r2752367539


##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBTablePreparedStatement extends IoTDBStatement implements 
PreparedStatement {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(IoTDBTablePreparedStatement.class);
+  private static final String METHOD_NOT_SUPPORTED_STRING = "Method not 
supported";
+
+  private final String sql;
+  private final String preparedStatementName;
+  private final int parameterCount;
+  private final boolean serverSidePrepared;
+
+  private final Object[] parameterValues;
+  private final int[] parameterTypes;
+
+  // retain parameters for backward compatibility
+  private final Map<Integer, String> parameters = new HashMap<>();
+
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection,
+      Iface client,
+      Long sessionId,
+      String sql,
+      ZoneId zoneId,
+      Charset charset)
+      throws SQLException {
+    super(connection, client, sessionId, zoneId, charset);
+    this.sql = sql;
+    this.preparedStatementName = generateStatementName();
+
+    if (isQueryStatement(sql)) {
+      // Send PREPARE request to server only for query statements
+      this.serverSidePrepared = true;
+      TSPrepareReq prepareReq = new TSPrepareReq();
+      prepareReq.setSessionId(sessionId);
+      prepareReq.setSql(sql);
+      prepareReq.setStatementName(preparedStatementName);
+
+      try {
+        TSPrepareResp resp = client.prepareStatement(prepareReq);
+        RpcUtils.verifySuccess(resp.getStatus());
+
+        this.parameterCount = resp.isSetParameterCount() ? 
resp.getParameterCount() : 0;
+        this.parameterValues = new Object[parameterCount];
+        this.parameterTypes = new int[parameterCount];
+
+        for (int i = 0; i < parameterCount; i++) {
+          parameterTypes[i] = Types.NULL;
+        }
+      } catch (TException | StatementExecutionException e) {
+        throw new SQLException("Failed to prepare statement: " + 
e.getMessage(), e);
+      }
+    } else {
+      // For non-query statements, only keep text parameters for client-side 
substitution.
+      this.serverSidePrepared = false;
+      this.parameterCount = 0;
+      this.parameterValues = null;
+      this.parameterTypes = null;
+    }
+  }
+
+  // Only for tests
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection, Iface client, Long sessionId, String sql, 
ZoneId zoneId)
+      throws SQLException {
+    this(connection, client, sessionId, sql, zoneId, 
TSFileConfig.STRING_CHARSET);
+  }
+
+  private String generateStatementName() {
+    // StatementId is unique across all sessions in one IoTDB instance
+    return "jdbc_ps_" + getStmtId();
+  }
+
+  @Override
+  public void addBatch() throws SQLException {
+    super.addBatch(createCompleteSql(sql, parameters));
+  }
+
+  @Override
+  public void clearParameters() {
+    this.parameters.clear();
+    if (serverSidePrepared) {
+      for (int i = 0; i < parameterCount; i++) {
+        parameterValues[i] = null;
+        parameterTypes[i] = Types.NULL;
+      }
+    }
+  }
+
+  @Override
+  public boolean execute() throws SQLException {
+    if (isQueryStatement(sql)) {
+      TSExecuteStatementResp resp = executeInternal();
+      return resp.isSetQueryDataSet() || resp.isSetQueryResult();
+    } else {
+      return super.execute(createCompleteSql(sql, parameters));
+    }
+  }
+
+  private boolean isQueryStatement(String sql) {
+    if (sql == null) {
+      return false;
+    }
+    String trimmedSql = sql.trim().toUpperCase();
+    return trimmedSql.startsWith("SELECT");

Review Comment:
   Not right, we also support with clause in iotdb now



##########
iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift:
##########
@@ -167,6 +167,34 @@ struct TSCloseOperationReq {
   4: optional string preparedStatementName
 }
 
+// PREPARE
+struct TSPrepareReq {
+  1: required i64 sessionId
+  2: required string sql
+  3: required string statementName
+}
+
+struct TSPrepareResp {
+  1: required common.TSStatus status
+  2: optional i32 parameterCount

Review Comment:
   ```suggestion
     3: optional i32 datanode_id
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.PREPARE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);

Review Comment:
   ```suggestion
             resp.setMoreData(!finished);
             
             if (quota != null) {
               quota.addReadResult(resp.getQueryResult());
             }
             // Should return SUCCESS_MESSAGE for insert into query
             if (queryExecution.getQueryType() == QueryType.READ_WRITE) {
               resp.setColumns(null);
             }
   ```
   
   add quota check



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementHelper.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.session;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.PreparedStatementInfo;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+
+/** Helper for prepared statement registration/unregistration. */
+public class PreparedStatementHelper {
+
+  private PreparedStatementHelper() {}
+
+  /** Registers a prepared statement in the session. */
+  public static PreparedStatementInfo register(
+      IClientSession session, String statementName, Statement sql) {
+    if (session.getPreparedStatement(statementName) != null) {
+      throw new SemanticException(
+          String.format("Prepared statement '%s' already exists", 
statementName));
+    }
+
+    long memorySizeInBytes = sql == null ? 0L : sql.ramBytesUsed();
+
+    PreparedStatementMemoryManager.getInstance().allocate(statementName, 
memorySizeInBytes);
+
+    PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, 
memorySizeInBytes);
+    session.addPreparedStatement(statementName, info);
+
+    return info;
+  }
+
+  /** Unregisters a prepared statement from the session. */
+  public static PreparedStatementInfo unregister(IClientSession session, 
String statementName) {

Review Comment:
   ```suggestion
     public static void unregister(IClientSession session, String 
statementName) {
   ```
   return value is never used.



##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerializer.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.stmt;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Serializer for PreparedStatement parameters. */
+public class PreparedParameterSerializer {

Review Comment:
   ```suggestion
   public class PreparedParameterSerDe {
   ```
   Not only Serializer, but also Deserializer



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();

Review Comment:
   ```suggestion
               String statementName = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId() + 
req.getStatementName();
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);

Review Comment:
   ```suggestion
         resp.setParameterCount(parameterCount);
         
resp.setDataNodeId(IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
   ```



##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBTablePreparedStatement extends IoTDBStatement implements 
PreparedStatement {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(IoTDBTablePreparedStatement.class);
+  private static final String METHOD_NOT_SUPPORTED_STRING = "Method not 
supported";
+
+  private final String sql;
+  private final String preparedStatementName;
+  private final int parameterCount;
+  private final boolean serverSidePrepared;
+
+  private final Object[] parameterValues;
+  private final int[] parameterTypes;
+
+  // retain parameters for backward compatibility
+  private final Map<Integer, String> parameters = new HashMap<>();
+
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection,
+      Iface client,
+      Long sessionId,
+      String sql,
+      ZoneId zoneId,
+      Charset charset)
+      throws SQLException {
+    super(connection, client, sessionId, zoneId, charset);
+    this.sql = sql;
+    this.preparedStatementName = generateStatementName();
+
+    if (isQueryStatement(sql)) {
+      // Send PREPARE request to server only for query statements
+      this.serverSidePrepared = true;
+      TSPrepareReq prepareReq = new TSPrepareReq();
+      prepareReq.setSessionId(sessionId);
+      prepareReq.setSql(sql);
+      prepareReq.setStatementName(preparedStatementName);
+
+      try {
+        TSPrepareResp resp = client.prepareStatement(prepareReq);
+        RpcUtils.verifySuccess(resp.getStatus());
+
+        this.parameterCount = resp.isSetParameterCount() ? 
resp.getParameterCount() : 0;
+        this.parameterValues = new Object[parameterCount];
+        this.parameterTypes = new int[parameterCount];
+
+        for (int i = 0; i < parameterCount; i++) {
+          parameterTypes[i] = Types.NULL;
+        }
+      } catch (TException | StatementExecutionException e) {
+        throw new SQLException("Failed to prepare statement: " + 
e.getMessage(), e);
+      }
+    } else {
+      // For non-query statements, only keep text parameters for client-side 
substitution.
+      this.serverSidePrepared = false;
+      this.parameterCount = 0;
+      this.parameterValues = null;
+      this.parameterTypes = null;
+    }
+  }
+
+  // Only for tests
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection, Iface client, Long sessionId, String sql, 
ZoneId zoneId)
+      throws SQLException {
+    this(connection, client, sessionId, sql, zoneId, 
TSFileConfig.STRING_CHARSET);
+  }
+
+  private String generateStatementName() {
+    // StatementId is unique across all sessions in one IoTDB instance
+    return "jdbc_ps_" + getStmtId();

Review Comment:
   in this way, it's not always unique across all sessions in one IoTDB 
instance, you should add a datanodeid_ prefix.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.PREPARE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(
+              e,
+              OperationType.EXECUTE_PREPARED_STATEMENT.getName(),
+              TSStatusCode.INTERNAL_SERVER_ERROR));
+    } finally {

Review Comment:
   catch error here, do exactly what executeStatementInternal does



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.PREPARE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(
+              e,
+              OperationType.EXECUTE_PREPARED_STATEMENT.getName(),
+              TSStatusCode.INTERNAL_SERVER_ERROR));
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId, null, t);
+      }
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+    }
+  }
+
+  @Override
+  public TSStatus deallocatePreparedStatement(TSDeallocatePreparedReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return getNotLoggedInStatus();
+    }
+
+    try {
+      PreparedStatementHelper.unregister(clientSession, 
req.getStatementName());
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } catch (Exception e) {
+      return onQueryException(
+          e,
+          OperationType.DEALLOCATE_PREPARED_STATEMENT.getName(),
+          TSStatusCode.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private Literal convertToLiteral(DeserializedParam param) {
+    if (param.isNull()) {
+      return new NullLiteral();
+    }
+
+    switch (param.type) {
+      case BOOLEAN:
+        return new BooleanLiteral((Boolean) param.value ? "true" : "false");

Review Comment:
   ```suggestion
           return new BooleanLiteral((boolean) param.value ? "true" : "false");
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.PREPARE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,

Review Comment:
   not enough? should contain all the constants? You can return it in 
`convertToLiteral` method.



##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerializer.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.stmt;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Serializer for PreparedStatement parameters. */
+public class PreparedParameterSerializer {
+
+  public static class DeserializedParam {
+    public final TSDataType type;
+    public final Object value;
+
+    DeserializedParam(TSDataType type, Object value) {
+      this.type = type;
+      this.value = value;
+    }
+
+    public boolean isNull() {
+      return type == TSDataType.UNKNOWN || value == null;
+    }
+  }
+
+  private PreparedParameterSerializer() {}
+
+  /** Serialize parameters to binary format. */
+  public static ByteBuffer serialize(Object[] values, int[] jdbcTypes, int 
count) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+
+      dos.writeInt(count);
+      for (int i = 0; i < count; i++) {
+        serializeParameter(dos, values[i], jdbcTypes[i]);
+      }
+
+      dos.flush();
+      return ByteBuffer.wrap(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to serialize parameters", e);
+    }
+  }
+
+  private static void serializeParameter(DataOutputStream dos, Object value, 
int jdbcType)
+      throws IOException {
+    if (value == null || jdbcType == Types.NULL) {
+      dos.writeByte(TSDataType.UNKNOWN.serialize());
+      return;
+    }
+
+    switch (jdbcType) {
+      case Types.BOOLEAN:
+        dos.writeByte(TSDataType.BOOLEAN.serialize());
+        dos.writeByte((Boolean) value ? 1 : 0);

Review Comment:
   use `ReadWriteIOUtils.write()` and `ReadWriteIOUtils.read(XXX)` instead of 
defining your own serde way.



##########
integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBTablePreparedStatementJDBCIT.java:
##########


Review Comment:
   add test case for each supported data type defined in TsDataType.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.PREPARE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(
+              e,
+              OperationType.EXECUTE_PREPARED_STATEMENT.getName(),
+              TSStatusCode.INTERNAL_SERVER_ERROR));
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId, null, t);
+      }
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);

Review Comment:
   why don't you follow the `executeStatementInternal` way?



##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java:
##########
@@ -0,0 +1,739 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBTablePreparedStatement extends IoTDBStatement implements 
PreparedStatement {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(IoTDBTablePreparedStatement.class);
+  private static final String METHOD_NOT_SUPPORTED_STRING = "Method not 
supported";
+
+  private final String sql;
+  private final String preparedStatementName;
+  private final int parameterCount;
+  private final boolean serverSidePrepared;
+
+  private final Object[] parameterValues;
+  private final int[] parameterTypes;
+
+  // retain parameters for backward compatibility
+  private final Map<Integer, String> parameters = new HashMap<>();
+
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection,
+      Iface client,
+      Long sessionId,
+      String sql,
+      ZoneId zoneId,
+      Charset charset)
+      throws SQLException {
+    super(connection, client, sessionId, zoneId, charset);
+    this.sql = sql;
+    this.preparedStatementName = generateStatementName();
+
+    if (isQueryStatement(sql)) {
+      // Send PREPARE request to server only for query statements
+      this.serverSidePrepared = true;
+      TSPrepareReq prepareReq = new TSPrepareReq();
+      prepareReq.setSessionId(sessionId);
+      prepareReq.setSql(sql);
+      prepareReq.setStatementName(preparedStatementName);
+
+      try {
+        TSPrepareResp resp = client.prepareStatement(prepareReq);
+        RpcUtils.verifySuccess(resp.getStatus());
+
+        this.parameterCount = resp.isSetParameterCount() ? 
resp.getParameterCount() : 0;
+        this.parameterValues = new Object[parameterCount];
+        this.parameterTypes = new int[parameterCount];
+
+        for (int i = 0; i < parameterCount; i++) {
+          parameterTypes[i] = Types.NULL;
+        }
+      } catch (TException | StatementExecutionException e) {
+        throw new SQLException("Failed to prepare statement: " + 
e.getMessage(), e);
+      }
+    } else {
+      // For non-query statements, only keep text parameters for client-side 
substitution.
+      this.serverSidePrepared = false;
+      this.parameterCount = 0;
+      this.parameterValues = null;
+      this.parameterTypes = null;
+    }
+  }
+
+  // Only for tests
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection, Iface client, Long sessionId, String sql, 
ZoneId zoneId)
+      throws SQLException {
+    this(connection, client, sessionId, sql, zoneId, 
TSFileConfig.STRING_CHARSET);
+  }
+
+  private String generateStatementName() {
+    // StatementId is unique across all sessions in one IoTDB instance
+    return "jdbc_ps_" + getStmtId();
+  }
+
+  @Override
+  public void addBatch() throws SQLException {
+    super.addBatch(createCompleteSql(sql, parameters));
+  }
+
+  @Override
+  public void clearParameters() {
+    this.parameters.clear();
+    if (serverSidePrepared) {
+      for (int i = 0; i < parameterCount; i++) {
+        parameterValues[i] = null;
+        parameterTypes[i] = Types.NULL;
+      }
+    }
+  }
+
+  @Override
+  public boolean execute() throws SQLException {
+    if (isQueryStatement(sql)) {
+      TSExecuteStatementResp resp = executeInternal();
+      return resp.isSetQueryDataSet() || resp.isSetQueryResult();
+    } else {
+      return super.execute(createCompleteSql(sql, parameters));
+    }
+  }
+
+  private boolean isQueryStatement(String sql) {
+    if (sql == null) {
+      return false;
+    }
+    String trimmedSql = sql.trim().toUpperCase();
+    return trimmedSql.startsWith("SELECT");

Review Comment:
   You should catch specific exception while calling `client.prepareStatement` 
method, depending on the server resp to decide whether this is a non-query sql, 
and then degrade to previous client-side way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to