Copilot commented on code in PR #17027:
URL: https://github.com/apache/iotdb/pull/17027#discussion_r2702982788


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,179 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      // Parse SQL to get Statement AST
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      // Get parameter count before registering
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      // Register the prepared statement using helper
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      // Deserialize parameters and convert to Literal list
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      // Construct Execute AST node, reuse Coordinator's existing Execute 
handling logic
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      // Request query ID
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      // Execute using Coordinator (Coordinator internally handles Execute 
statement)
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId, null, t);
+      }
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+    }
+  }
+
+  @Override
+  public TSStatus deallocatePreparedStatement(TSDeallocatePreparedReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return getNotLoggedInStatus();
+    }
+
+    try {
+      // Unregister the prepared statement using helper
+      PreparedStatementHelper.unregister(clientSession, 
req.getStatementName());
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    } catch (Exception e) {
+      return onQueryException(
+          e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR);

Review Comment:
   The error message uses OperationType.EXECUTE_STATEMENT for a DEALLOCATE 
operation. This is misleading and could confuse users debugging errors. 
Consider using a more appropriate operation type like 
OperationType.DEALLOCATE_STATEMENT or creating a new operation type if it 
doesn't exist.
   ```suggestion
             e, OperationType.DEALLOCATE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,179 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      // Parse SQL to get Statement AST
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      // Get parameter count before registering
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      // Register the prepared statement using helper
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));

Review Comment:
   The error message uses OperationType.EXECUTE_STATEMENT for a PREPARE 
operation. This is misleading and could confuse users debugging errors. 
Consider using a more appropriate operation type like 
OperationType.PREPARE_STATEMENT or creating a new operation type if it doesn't 
exist.
   ```suggestion
                 e, OperationType.PREPARE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
   ```



##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerializer.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.stmt;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for PreparedStatement parameters.
+ *
+ * <p>Binary format: [paramCount:4bytes][param1][param2]...
+ *
+ * <p>Each parameter: [type:1byte][value:variable]
+ */
+public class PreparedParameterSerializer {
+
+  /** Deserialized parameter holding type and value. */
+  public static class DeserializedParam {
+    public final TSDataType type;
+    public final Object value;
+
+    DeserializedParam(TSDataType type, Object value) {
+      this.type = type;
+      this.value = value;
+    }
+
+    public boolean isNull() {
+      return type == TSDataType.UNKNOWN || value == null;
+    }
+  }
+
+  private PreparedParameterSerializer() {}
+
+  // ================== Serialize (Client Side) ==================
+
+  /**
+   * Serialize parameters to binary format.
+   *
+   * @param values parameter values
+   * @param jdbcTypes JDBC type codes (java.sql.Types)
+   * @param count number of parameters
+   * @return ByteBuffer containing serialized parameters
+   */
+  public static ByteBuffer serialize(Object[] values, int[] jdbcTypes, int 
count) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+
+      dos.writeInt(count);
+      for (int i = 0; i < count; i++) {
+        serializeParameter(dos, values[i], jdbcTypes[i]);
+      }
+
+      dos.flush();
+      return ByteBuffer.wrap(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to serialize parameters", e);
+    }
+  }
+
+  private static void serializeParameter(DataOutputStream dos, Object value, 
int jdbcType)
+      throws IOException {
+    if (value == null || jdbcType == Types.NULL) {
+      dos.writeByte(TSDataType.UNKNOWN.serialize());
+      return;
+    }
+
+    switch (jdbcType) {
+      case Types.BOOLEAN:
+        dos.writeByte(TSDataType.BOOLEAN.serialize());
+        dos.writeByte((Boolean) value ? 1 : 0);
+        break;
+
+      case Types.INTEGER:
+        dos.writeByte(TSDataType.INT32.serialize());
+        dos.writeInt(((Number) value).intValue());
+        break;
+
+      case Types.BIGINT:
+        dos.writeByte(TSDataType.INT64.serialize());
+        dos.writeLong(((Number) value).longValue());
+        break;
+
+      case Types.FLOAT:
+        dos.writeByte(TSDataType.FLOAT.serialize());
+        dos.writeFloat(((Number) value).floatValue());
+        break;
+
+      case Types.DOUBLE:
+        dos.writeByte(TSDataType.DOUBLE.serialize());
+        dos.writeDouble(((Number) value).doubleValue());
+        break;
+
+      case Types.VARCHAR:
+      case Types.CHAR:
+        byte[] strBytes = ((String) value).getBytes(StandardCharsets.UTF_8);
+        dos.writeByte(TSDataType.STRING.serialize());
+        dos.writeInt(strBytes.length);
+        dos.write(strBytes);
+        break;
+
+      case Types.BINARY:
+      case Types.VARBINARY:
+        byte[] binBytes = (byte[]) value;
+        dos.writeByte(TSDataType.BLOB.serialize());
+        dos.writeInt(binBytes.length);
+        dos.write(binBytes);
+        break;
+
+      default:
+        byte[] defaultBytes = 
String.valueOf(value).getBytes(StandardCharsets.UTF_8);
+        dos.writeByte(TSDataType.STRING.serialize());
+        dos.writeInt(defaultBytes.length);
+        dos.write(defaultBytes);
+        break;
+    }
+  }
+
+  // ================== Deserialize (Server Side) ==================
+
+  /**
+   * Deserialize parameters from binary format.
+   *
+   * @param buffer ByteBuffer containing serialized parameters
+   * @return list of deserialized parameters with type and value
+   */
+  public static List<DeserializedParam> deserialize(ByteBuffer buffer) {
+    if (buffer == null || buffer.remaining() == 0) {
+      return new ArrayList<>();
+    }
+
+    buffer.rewind();
+    int count = buffer.getInt();
+    List<DeserializedParam> result = new ArrayList<>(count);

Review Comment:
   The deserialize method does not validate the count parameter read from the 
buffer. A malicious or corrupted buffer with a very large count value could 
cause excessive memory allocation or out-of-memory errors. Consider adding a 
reasonable upper bound validation on count before allocating the result list.



##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerializer.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.stmt;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for PreparedStatement parameters.
+ *
+ * <p>Binary format: [paramCount:4bytes][param1][param2]...
+ *
+ * <p>Each parameter: [type:1byte][value:variable]
+ */
+public class PreparedParameterSerializer {
+
+  /** Deserialized parameter holding type and value. */
+  public static class DeserializedParam {
+    public final TSDataType type;
+    public final Object value;
+
+    DeserializedParam(TSDataType type, Object value) {
+      this.type = type;
+      this.value = value;
+    }
+
+    public boolean isNull() {
+      return type == TSDataType.UNKNOWN || value == null;
+    }
+  }
+
+  private PreparedParameterSerializer() {}
+
+  // ================== Serialize (Client Side) ==================
+
+  /**
+   * Serialize parameters to binary format.
+   *
+   * @param values parameter values
+   * @param jdbcTypes JDBC type codes (java.sql.Types)
+   * @param count number of parameters
+   * @return ByteBuffer containing serialized parameters
+   */
+  public static ByteBuffer serialize(Object[] values, int[] jdbcTypes, int 
count) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+
+      dos.writeInt(count);
+      for (int i = 0; i < count; i++) {
+        serializeParameter(dos, values[i], jdbcTypes[i]);
+      }
+
+      dos.flush();
+      return ByteBuffer.wrap(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to serialize parameters", e);
+    }
+  }
+
+  private static void serializeParameter(DataOutputStream dos, Object value, 
int jdbcType)
+      throws IOException {
+    if (value == null || jdbcType == Types.NULL) {
+      dos.writeByte(TSDataType.UNKNOWN.serialize());
+      return;
+    }
+
+    switch (jdbcType) {
+      case Types.BOOLEAN:
+        dos.writeByte(TSDataType.BOOLEAN.serialize());
+        dos.writeByte((Boolean) value ? 1 : 0);
+        break;
+
+      case Types.INTEGER:
+        dos.writeByte(TSDataType.INT32.serialize());
+        dos.writeInt(((Number) value).intValue());
+        break;
+
+      case Types.BIGINT:
+        dos.writeByte(TSDataType.INT64.serialize());
+        dos.writeLong(((Number) value).longValue());
+        break;
+
+      case Types.FLOAT:
+        dos.writeByte(TSDataType.FLOAT.serialize());
+        dos.writeFloat(((Number) value).floatValue());
+        break;
+
+      case Types.DOUBLE:
+        dos.writeByte(TSDataType.DOUBLE.serialize());
+        dos.writeDouble(((Number) value).doubleValue());
+        break;
+
+      case Types.VARCHAR:
+      case Types.CHAR:
+        byte[] strBytes = ((String) value).getBytes(StandardCharsets.UTF_8);
+        dos.writeByte(TSDataType.STRING.serialize());
+        dos.writeInt(strBytes.length);
+        dos.write(strBytes);
+        break;
+
+      case Types.BINARY:
+      case Types.VARBINARY:
+        byte[] binBytes = (byte[]) value;
+        dos.writeByte(TSDataType.BLOB.serialize());
+        dos.writeInt(binBytes.length);
+        dos.write(binBytes);
+        break;
+
+      default:
+        byte[] defaultBytes = 
String.valueOf(value).getBytes(StandardCharsets.UTF_8);
+        dos.writeByte(TSDataType.STRING.serialize());
+        dos.writeInt(defaultBytes.length);
+        dos.write(defaultBytes);
+        break;
+    }
+  }
+
+  // ================== Deserialize (Server Side) ==================
+
+  /**
+   * Deserialize parameters from binary format.
+   *
+   * @param buffer ByteBuffer containing serialized parameters
+   * @return list of deserialized parameters with type and value
+   */
+  public static List<DeserializedParam> deserialize(ByteBuffer buffer) {
+    if (buffer == null || buffer.remaining() == 0) {
+      return new ArrayList<>();
+    }
+
+    buffer.rewind();
+    int count = buffer.getInt();
+    List<DeserializedParam> result = new ArrayList<>(count);
+
+    for (int i = 0; i < count; i++) {
+      byte typeCode = buffer.get();
+      TSDataType type = TSDataType.deserialize(typeCode);
+      Object value = deserializeValue(buffer, type);
+      result.add(new DeserializedParam(type, value));
+    }
+
+    return result;
+  }
+
+  private static Object deserializeValue(ByteBuffer buffer, TSDataType type) {
+    switch (type) {
+      case UNKNOWN:
+        return null;
+      case BOOLEAN:
+        return buffer.get() != 0;
+      case INT32:
+        return buffer.getInt();
+      case INT64:
+        return buffer.getLong();
+      case FLOAT:
+        return buffer.getFloat();
+      case DOUBLE:
+        return buffer.getDouble();
+      case TEXT:
+      case STRING:
+        int strLen = buffer.getInt();
+        byte[] strBytes = new byte[strLen];
+        buffer.get(strBytes);
+        return new String(strBytes, StandardCharsets.UTF_8);
+      case BLOB:
+        int binLen = buffer.getInt();
+        byte[] binBytes = new byte[binLen];
+        buffer.get(binBytes);
+        return binBytes;
+      default:
+        throw new IllegalArgumentException("Unsupported type: " + type);
+    }
+  }
+}

Review Comment:
   PreparedParameterSerializer lacks unit tests for its serialization and 
deserialization logic. Consider adding unit tests to verify correct handling of 
all data types, null values, empty parameter lists, and edge cases like very 
large strings or binary data.



##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBTablePreparedStatement extends IoTDBStatement implements 
PreparedStatement {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(IoTDBTablePreparedStatement.class);
+  private static final String METHOD_NOT_SUPPORTED_STRING = "Method not 
supported";
+
+  private final String sql;
+  private final String preparedStatementName;
+  private final int parameterCount;
+
+  // Parameter values stored as objects for binary serialization
+  private final Object[] parameterValues;
+  private final int[] parameterTypes;
+
+  /** save the SQL parameters as (paramLoc,paramValue) pairs for backward 
compatibility. */
+  private final Map<Integer, String> parameters = new HashMap<>();
+
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection,
+      Iface client,
+      Long sessionId,
+      String sql,
+      ZoneId zoneId,
+      Charset charset)
+      throws SQLException {
+    super(connection, client, sessionId, zoneId, charset);
+    this.sql = sql;
+    this.preparedStatementName = generateStatementName();
+
+    // Send PREPARE request to server
+    TSPrepareReq prepareReq = new TSPrepareReq();
+    prepareReq.setSessionId(sessionId);
+    prepareReq.setSql(sql);
+    prepareReq.setStatementName(preparedStatementName);
+
+    try {
+      TSPrepareResp resp = client.prepareStatement(prepareReq);
+      RpcUtils.verifySuccess(resp.getStatus());
+
+      this.parameterCount = resp.isSetParameterCount() ? 
resp.getParameterCount() : 0;
+      this.parameterValues = new Object[parameterCount];
+      this.parameterTypes = new int[parameterCount];
+
+      // Initialize all parameter types to NULL
+      for (int i = 0; i < parameterCount; i++) {
+        parameterTypes[i] = Types.NULL;
+      }
+    } catch (TException | StatementExecutionException e) {
+      throw new SQLException("Failed to prepare statement: " + e.getMessage(), 
e);
+    }
+  }
+
+  // Only for tests
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection, Iface client, Long sessionId, String sql, 
ZoneId zoneId)
+      throws SQLException {
+    this(connection, client, sessionId, sql, zoneId, 
TSFileConfig.STRING_CHARSET);
+  }
+
+  private String generateStatementName() {
+    // StatementId is unique across all sessions in one IoTDB instance
+    return "jdbc_ps_" + getStmtId();
+  }
+
+  @Override
+  public void addBatch() throws SQLException {
+    super.addBatch(createCompleteSql(sql, parameters));
+  }
+
+  @Override
+  public void clearParameters() {
+    this.parameters.clear();
+    for (int i = 0; i < parameterCount; i++) {
+      parameterValues[i] = null;
+      parameterTypes[i] = Types.NULL;
+    }
+  }
+
+  @Override
+  public boolean execute() throws SQLException {
+    TSExecuteStatementResp resp = executeInternal();
+    return resp.isSetQueryDataSet() || resp.isSetQueryResult();
+  }
+
+  @Override
+  public ResultSet executeQuery() throws SQLException {
+    TSExecuteStatementResp resp = executeInternal();
+    return processQueryResult(resp);
+  }
+
+  @Override
+  public int executeUpdate() throws SQLException {
+    executeInternal();
+    return 0; // IoTDB doesn't return affected row count
+  }
+
+  private TSExecuteStatementResp executeInternal() throws SQLException {
+    // Validate all parameters are set
+    for (int i = 0; i < parameterCount; i++) {
+      if (parameterTypes[i] == Types.NULL
+          && parameterValues[i] == null
+          && !parameters.containsKey(i + 1)) {
+        throw new SQLException("Parameter #" + (i + 1) + " is unset");
+      }
+    }
+
+    TSExecutePreparedReq req = new TSExecutePreparedReq();
+    req.setSessionId(sessionId);
+    req.setStatementName(preparedStatementName);
+    req.setParameters(
+        PreparedParameterSerializer.serialize(parameterValues, parameterTypes, 
parameterCount));
+    req.setStatementId(getStmtId());
+
+    if (queryTimeout > 0) {
+      req.setTimeout(queryTimeout * 1000L);
+    }
+
+    try {
+      TSExecuteStatementResp resp = client.executePreparedStatement(req);
+      RpcUtils.verifySuccess(resp.getStatus());
+      return resp;
+    } catch (TException e) {
+      throw new SQLException("Failed to execute prepared statement: " + 
e.getMessage(), e);
+    } catch (StatementExecutionException e) {
+      throw new SQLException("Failed to execute prepared statement: " + 
e.getMessage(), e);
+    }
+  }
+
+  private ResultSet processQueryResult(TSExecuteStatementResp resp) throws 
SQLException {
+    if (resp.isSetQueryDataSet() || resp.isSetQueryResult()) {
+      // Create ResultSet from response
+      this.resultSet =
+          new IoTDBJDBCResultSet(
+              this,
+              resp.getColumns(),
+              resp.getDataTypeList(),
+              resp.columnNameIndexMap,
+              resp.ignoreTimeStamp,
+              client,
+              sql,
+              resp.queryId,
+              sessionId,
+              resp.queryResult,
+              resp.tracingInfo,
+              (long) queryTimeout * 1000,
+              resp.isSetMoreData() && resp.isMoreData(),
+              zoneId);
+      return resultSet;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (!isClosed()) {
+      // Deallocate prepared statement on server
+      TSDeallocatePreparedReq req = new TSDeallocatePreparedReq();
+      req.setSessionId(sessionId);
+      req.setStatementName(preparedStatementName);
+
+      try {
+        TSStatus status = client.deallocatePreparedStatement(req);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          logger.warn("Failed to deallocate prepared statement: {}", 
status.getMessage());
+        }
+      } catch (TException e) {
+        logger.warn("Error deallocating prepared statement", e);
+      }
+    }
+    super.close();
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    if (resultSet != null) {
+      return resultSet.getMetaData();
+    }
+    return null;
+  }
+
+  @Override
+  public ParameterMetaData getParameterMetaData() {
+    return new ParameterMetaData() {
+      @Override
+      public int getParameterCount() {
+        return parameterCount;
+      }
+
+      @Override
+      public int isNullable(int param) {
+        return ParameterMetaData.parameterNullableUnknown;
+      }
+
+      @Override
+      public boolean isSigned(int param) {
+        int type = parameterTypes[param - 1];
+        return type == Types.INTEGER
+            || type == Types.BIGINT
+            || type == Types.FLOAT
+            || type == Types.DOUBLE;
+      }
+
+      @Override
+      public int getPrecision(int param) {
+        return 0;
+      }
+
+      @Override
+      public int getScale(int param) {
+        return 0;
+      }
+
+      @Override
+      public int getParameterType(int param) {
+        return parameterTypes[param - 1];
+      }
+
+      @Override
+      public String getParameterTypeName(int param) {
+        return null;
+      }
+
+      @Override
+      public String getParameterClassName(int param) {
+        return null;
+      }
+
+      @Override
+      public int getParameterMode(int param) {
+        return ParameterMetaData.parameterModeIn;
+      }
+
+      @Override
+      public <T> T unwrap(Class<T> iface) {
+        return null;
+      }
+
+      @Override
+      public boolean isWrapperFor(Class<?> iface) {
+        return false;
+      }
+    };
+  }
+
+  // ================== Parameter Setters ==================
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = null;
+    parameterTypes[parameterIndex - 1] = Types.NULL;
+    this.parameters.put(parameterIndex, "NULL");
+  }
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType, String typeName) throws 
SQLException {
+    setNull(parameterIndex, sqlType);
+  }
+
+  @Override
+  public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.BOOLEAN;
+    this.parameters.put(parameterIndex, Boolean.toString(x));
+  }
+
+  @Override
+  public void setInt(int parameterIndex, int x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.INTEGER;
+    this.parameters.put(parameterIndex, Integer.toString(x));
+  }
+
+  @Override
+  public void setLong(int parameterIndex, long x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.BIGINT;
+    this.parameters.put(parameterIndex, Long.toString(x));
+  }
+
+  @Override
+  public void setFloat(int parameterIndex, float x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.FLOAT;
+    this.parameters.put(parameterIndex, Float.toString(x));
+  }
+
+  @Override
+  public void setDouble(int parameterIndex, double x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.DOUBLE;
+    this.parameters.put(parameterIndex, Double.toString(x));
+  }
+
+  @Override
+  public void setString(int parameterIndex, String x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.VARCHAR;
+    if (x == null) {
+      this.parameters.put(parameterIndex, null);
+    } else {
+      this.parameters.put(parameterIndex, "'" + escapeSingleQuotes(x) + "'");
+    }
+  }
+
+  @Override
+  public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.BINARY;
+    Binary binary = new Binary(x);
+    this.parameters.put(parameterIndex, 
binary.getStringValue(TSFileConfig.STRING_CHARSET));
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    String dateStr = dateFormat.format(x);
+    parameterValues[parameterIndex - 1] = dateStr;
+    parameterTypes[parameterIndex - 1] = Types.VARCHAR;
+    this.parameters.put(parameterIndex, "'" + dateStr + "'");
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x, Calendar cal) throws 
SQLException {
+    setDate(parameterIndex, x);
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    try {
+      long time = x.getTime();
+      String timeprecision = client.getProperties().getTimestampPrecision();
+      switch (timeprecision.toLowerCase()) {
+        case "ms":
+          break;
+        case "us":
+          time = time * 1000;
+          break;
+        case "ns":
+          time = time * 1000000;
+          break;
+        default:
+          break;
+      }
+      parameterValues[parameterIndex - 1] = time;
+      parameterTypes[parameterIndex - 1] = Types.BIGINT;
+      this.parameters.put(parameterIndex, Long.toString(time));
+    } catch (TException e) {
+      throw new SQLException("Failed to get time precision: " + 
e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x, Calendar cal) throws 
SQLException {
+    setTime(parameterIndex, x);
+  }
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x) throws 
SQLException {
+    checkParameterIndex(parameterIndex);
+    ZonedDateTime zonedDateTime =
+        ZonedDateTime.ofInstant(Instant.ofEpochMilli(x.getTime()), 
super.zoneId);
+    String tsStr = zonedDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+    parameterValues[parameterIndex - 1] = tsStr;
+    parameterTypes[parameterIndex - 1] = Types.VARCHAR;
+    this.parameters.put(parameterIndex, tsStr);

Review Comment:
   The timestamp string is stored in the parameters map without quotes (line 
448), while strings are stored with quotes (line 381). This inconsistency in 
the parameters map could cause issues if this map is used for SQL string 
construction or backward compatibility. Consider adding quotes around the 
timestamp string for consistency, or document why timestamps don't need quotes.
   ```suggestion
       this.parameters.put(parameterIndex, "\"" + tsStr + "\"");
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -1488,6 +1505,179 @@ public TSStatus closeOperation(TSCloseOperationReq req) 
{
         COORDINATOR::cleanupQueryExecution);
   }
 
+  // ========================= PreparedStatement RPC Methods 
=========================
+
+  @Override
+  public TSPrepareResp prepareStatement(TSPrepareReq req) {
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return new TSPrepareResp(getNotLoggedInStatus());
+    }
+
+    try {
+      String sql = req.getSql();
+      String statementName = req.getStatementName();
+
+      // Parse SQL to get Statement AST
+      org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement =
+          relationSqlParser.createStatement(sql, clientSession.getZoneId(), 
clientSession);
+
+      if (statement == null) {
+        return new TSPrepareResp(
+            RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse 
SQL: " + sql));
+      }
+
+      // Get parameter count before registering
+      int parameterCount = ParameterExtractor.getParameterCount(statement);
+
+      // Register the prepared statement using helper
+      PreparedStatementHelper.register(clientSession, statementName, 
statement);
+
+      TSPrepareResp resp = new 
TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setParameterCount(parameterCount);
+      return resp;
+    } catch (Exception e) {
+      return new TSPrepareResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+  }
+
+  @Override
+  public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq 
req) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      String statementName = req.getStatementName();
+
+      // Deserialize parameters and convert to Literal list
+      List<DeserializedParam> rawParams =
+          
PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters()));
+      List<Literal> parameters = new ArrayList<>(rawParams.size());
+      for (DeserializedParam param : rawParams) {
+        parameters.add(convertToLiteral(param));
+      }
+
+      // Construct Execute AST node, reuse Coordinator's existing Execute 
handling logic
+      Execute executeStatement = new Execute(new Identifier(statementName), 
parameters);
+
+      // Request query ID
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, 
req.getStatementId());
+
+      // Execute using Coordinator (Coordinator internally handles Execute 
statement)
+      long timeout = req.isSetTimeout() ? req.getTimeout() : 
config.getQueryTimeoutThreshold();
+      ExecutionResult result =
+          COORDINATOR.executeForTableModel(
+              executeStatement,
+              relationSqlParser,
+              clientSession,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "EXECUTE " + statementName,
+              metadata,
+              timeout,
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        finished = true;
+        return RpcUtils.getTSExecuteStatementResp(result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution != null && queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          int fetchSize =
+              req.isSetFetchSize() ? req.getFetchSize() : 
config.getThriftMaxFrameSize();
+          finished = setResultForPrepared.apply(resp, queryExecution, 
fetchSize);
+          resp.setMoreData(!finished);
+        } else {
+          finished = true;
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(
+              e, OperationType.EXECUTE_STATEMENT.getName(), 
TSStatusCode.INTERNAL_SERVER_ERROR));
+    } finally {
+      long currentOperationCost = System.nanoTime() - startTime;
+      if (finished) {
+        COORDINATOR.cleanupQueryExecution(queryId, null, t);
+      }
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);

Review Comment:
   If an exception occurs before requestQueryId is called (line 1573), the 
queryId will remain Long.MIN_VALUE and cleanupQueryExecution will be called 
with this invalid ID in the finally block. While this might be handled 
gracefully by the Coordinator, it would be cleaner to only call 
cleanupQueryExecution if a valid queryId was obtained. Consider checking if 
queryId != Long.MIN_VALUE before calling cleanup.
   ```suggestion
         if (queryId != Long.MIN_VALUE) {
           if (finished) {
             COORDINATOR.cleanupQueryExecution(queryId, null, t);
           }
           COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
         }
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementHelper.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.session;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.PreparedStatementInfo;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+
+/**
+ * Helper class for managing prepared statement registration and 
unregistration. Provides common
+ * logic shared between RPC methods and ConfigTask implementations.
+ */
+public class PreparedStatementHelper {
+
+  private PreparedStatementHelper() {
+    // Utility class
+  }
+
+  /**
+   * Registers a prepared statement in the session.
+   *
+   * <p>This method performs the following operations:
+   *
+   * <ol>
+   *   <li>Checks if a prepared statement with the same name already exists
+   *   <li>Calculates memory size of the AST
+   *   <li>Allocates memory from PreparedStatementMemoryManager
+   *   <li>Creates and stores PreparedStatementInfo in the session
+   * </ol>
+   *
+   * @param session the client session
+   * @param statementName the name of the prepared statement
+   * @param sql the parsed SQL statement (AST)
+   * @return the created PreparedStatementInfo
+   * @throws SemanticException if a prepared statement with the same name 
already exists
+   */
+  public static PreparedStatementInfo register(
+      IClientSession session, String statementName, Statement sql) {
+    // Check if prepared statement with the same name already exists
+    if (session.getPreparedStatement(statementName) != null) {
+      throw new SemanticException(
+          String.format("Prepared statement '%s' already exists", 
statementName));
+    }
+
+    // Calculate memory size of the AST
+    long memorySizeInBytes = sql == null ? 0L : sql.ramBytesUsed();
+
+    // Allocate memory from PreparedStatementMemoryManager
+    PreparedStatementMemoryManager.getInstance().allocate(statementName, 
memorySizeInBytes);
+
+    // Create and store PreparedStatementInfo
+    PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, 
memorySizeInBytes);
+    session.addPreparedStatement(statementName, info);

Review Comment:
   If session.addPreparedStatement throws an exception after 
PreparedStatementMemoryManager.allocate succeeds, the allocated memory will not 
be released, causing a memory leak. Consider wrapping the addPreparedStatement 
call in a try-catch block and releasing the allocated memory if it fails.
   ```suggestion
       try {
         session.addPreparedStatement(statementName, info);
       } catch (RuntimeException e) {
         // Roll back memory allocation if registration fails
         
PreparedStatementMemoryManager.getInstance().release(memorySizeInBytes);
         throw e;
       }
   ```



##########
iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.jdbc;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IoTDBTablePreparedStatement extends IoTDBStatement implements 
PreparedStatement {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(IoTDBTablePreparedStatement.class);
+  private static final String METHOD_NOT_SUPPORTED_STRING = "Method not 
supported";
+
+  private final String sql;
+  private final String preparedStatementName;
+  private final int parameterCount;
+
+  // Parameter values stored as objects for binary serialization
+  private final Object[] parameterValues;
+  private final int[] parameterTypes;
+
+  /** save the SQL parameters as (paramLoc,paramValue) pairs for backward 
compatibility. */
+  private final Map<Integer, String> parameters = new HashMap<>();
+
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection,
+      Iface client,
+      Long sessionId,
+      String sql,
+      ZoneId zoneId,
+      Charset charset)
+      throws SQLException {
+    super(connection, client, sessionId, zoneId, charset);
+    this.sql = sql;
+    this.preparedStatementName = generateStatementName();
+
+    // Send PREPARE request to server
+    TSPrepareReq prepareReq = new TSPrepareReq();
+    prepareReq.setSessionId(sessionId);
+    prepareReq.setSql(sql);
+    prepareReq.setStatementName(preparedStatementName);
+
+    try {
+      TSPrepareResp resp = client.prepareStatement(prepareReq);
+      RpcUtils.verifySuccess(resp.getStatus());
+
+      this.parameterCount = resp.isSetParameterCount() ? 
resp.getParameterCount() : 0;
+      this.parameterValues = new Object[parameterCount];
+      this.parameterTypes = new int[parameterCount];
+
+      // Initialize all parameter types to NULL
+      for (int i = 0; i < parameterCount; i++) {
+        parameterTypes[i] = Types.NULL;
+      }
+    } catch (TException | StatementExecutionException e) {
+      throw new SQLException("Failed to prepare statement: " + e.getMessage(), 
e);
+    }
+  }
+
+  // Only for tests
+  IoTDBTablePreparedStatement(
+      IoTDBConnection connection, Iface client, Long sessionId, String sql, 
ZoneId zoneId)
+      throws SQLException {
+    this(connection, client, sessionId, sql, zoneId, 
TSFileConfig.STRING_CHARSET);
+  }
+
+  private String generateStatementName() {
+    // StatementId is unique across all sessions in one IoTDB instance
+    return "jdbc_ps_" + getStmtId();
+  }
+
+  @Override
+  public void addBatch() throws SQLException {
+    super.addBatch(createCompleteSql(sql, parameters));
+  }
+
+  @Override
+  public void clearParameters() {
+    this.parameters.clear();
+    for (int i = 0; i < parameterCount; i++) {
+      parameterValues[i] = null;
+      parameterTypes[i] = Types.NULL;
+    }
+  }
+
+  @Override
+  public boolean execute() throws SQLException {
+    TSExecuteStatementResp resp = executeInternal();
+    return resp.isSetQueryDataSet() || resp.isSetQueryResult();
+  }
+
+  @Override
+  public ResultSet executeQuery() throws SQLException {
+    TSExecuteStatementResp resp = executeInternal();
+    return processQueryResult(resp);
+  }
+
+  @Override
+  public int executeUpdate() throws SQLException {
+    executeInternal();
+    return 0; // IoTDB doesn't return affected row count
+  }
+
+  private TSExecuteStatementResp executeInternal() throws SQLException {
+    // Validate all parameters are set
+    for (int i = 0; i < parameterCount; i++) {
+      if (parameterTypes[i] == Types.NULL
+          && parameterValues[i] == null
+          && !parameters.containsKey(i + 1)) {
+        throw new SQLException("Parameter #" + (i + 1) + " is unset");
+      }
+    }
+
+    TSExecutePreparedReq req = new TSExecutePreparedReq();
+    req.setSessionId(sessionId);
+    req.setStatementName(preparedStatementName);
+    req.setParameters(
+        PreparedParameterSerializer.serialize(parameterValues, parameterTypes, 
parameterCount));
+    req.setStatementId(getStmtId());
+
+    if (queryTimeout > 0) {
+      req.setTimeout(queryTimeout * 1000L);
+    }
+
+    try {
+      TSExecuteStatementResp resp = client.executePreparedStatement(req);
+      RpcUtils.verifySuccess(resp.getStatus());
+      return resp;
+    } catch (TException e) {
+      throw new SQLException("Failed to execute prepared statement: " + 
e.getMessage(), e);
+    } catch (StatementExecutionException e) {
+      throw new SQLException("Failed to execute prepared statement: " + 
e.getMessage(), e);
+    }
+  }
+
+  private ResultSet processQueryResult(TSExecuteStatementResp resp) throws 
SQLException {
+    if (resp.isSetQueryDataSet() || resp.isSetQueryResult()) {
+      // Create ResultSet from response
+      this.resultSet =
+          new IoTDBJDBCResultSet(
+              this,
+              resp.getColumns(),
+              resp.getDataTypeList(),
+              resp.columnNameIndexMap,
+              resp.ignoreTimeStamp,
+              client,
+              sql,
+              resp.queryId,
+              sessionId,
+              resp.queryResult,
+              resp.tracingInfo,
+              (long) queryTimeout * 1000,
+              resp.isSetMoreData() && resp.isMoreData(),
+              zoneId);
+      return resultSet;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (!isClosed()) {
+      // Deallocate prepared statement on server
+      TSDeallocatePreparedReq req = new TSDeallocatePreparedReq();
+      req.setSessionId(sessionId);
+      req.setStatementName(preparedStatementName);
+
+      try {
+        TSStatus status = client.deallocatePreparedStatement(req);
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          logger.warn("Failed to deallocate prepared statement: {}", 
status.getMessage());
+        }
+      } catch (TException e) {
+        logger.warn("Error deallocating prepared statement", e);
+      }
+    }
+    super.close();
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    if (resultSet != null) {
+      return resultSet.getMetaData();
+    }
+    return null;
+  }
+
+  @Override
+  public ParameterMetaData getParameterMetaData() {
+    return new ParameterMetaData() {
+      @Override
+      public int getParameterCount() {
+        return parameterCount;
+      }
+
+      @Override
+      public int isNullable(int param) {
+        return ParameterMetaData.parameterNullableUnknown;
+      }
+
+      @Override
+      public boolean isSigned(int param) {
+        int type = parameterTypes[param - 1];
+        return type == Types.INTEGER
+            || type == Types.BIGINT
+            || type == Types.FLOAT
+            || type == Types.DOUBLE;
+      }
+
+      @Override
+      public int getPrecision(int param) {
+        return 0;
+      }
+
+      @Override
+      public int getScale(int param) {
+        return 0;
+      }
+
+      @Override
+      public int getParameterType(int param) {
+        return parameterTypes[param - 1];
+      }
+
+      @Override
+      public String getParameterTypeName(int param) {
+        return null;
+      }
+
+      @Override
+      public String getParameterClassName(int param) {
+        return null;
+      }
+
+      @Override
+      public int getParameterMode(int param) {
+        return ParameterMetaData.parameterModeIn;
+      }
+
+      @Override
+      public <T> T unwrap(Class<T> iface) {
+        return null;
+      }
+
+      @Override
+      public boolean isWrapperFor(Class<?> iface) {
+        return false;
+      }
+    };
+  }
+
+  // ================== Parameter Setters ==================
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = null;
+    parameterTypes[parameterIndex - 1] = Types.NULL;
+    this.parameters.put(parameterIndex, "NULL");
+  }
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType, String typeName) throws 
SQLException {
+    setNull(parameterIndex, sqlType);
+  }
+
+  @Override
+  public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.BOOLEAN;
+    this.parameters.put(parameterIndex, Boolean.toString(x));
+  }
+
+  @Override
+  public void setInt(int parameterIndex, int x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.INTEGER;
+    this.parameters.put(parameterIndex, Integer.toString(x));
+  }
+
+  @Override
+  public void setLong(int parameterIndex, long x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.BIGINT;
+    this.parameters.put(parameterIndex, Long.toString(x));
+  }
+
+  @Override
+  public void setFloat(int parameterIndex, float x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.FLOAT;
+    this.parameters.put(parameterIndex, Float.toString(x));
+  }
+
+  @Override
+  public void setDouble(int parameterIndex, double x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.DOUBLE;
+    this.parameters.put(parameterIndex, Double.toString(x));
+  }
+
+  @Override
+  public void setString(int parameterIndex, String x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.VARCHAR;
+    if (x == null) {
+      this.parameters.put(parameterIndex, null);
+    } else {
+      this.parameters.put(parameterIndex, "'" + escapeSingleQuotes(x) + "'");
+    }
+  }
+
+  @Override
+  public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    parameterValues[parameterIndex - 1] = x;
+    parameterTypes[parameterIndex - 1] = Types.BINARY;
+    Binary binary = new Binary(x);
+    this.parameters.put(parameterIndex, 
binary.getStringValue(TSFileConfig.STRING_CHARSET));
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    String dateStr = dateFormat.format(x);
+    parameterValues[parameterIndex - 1] = dateStr;
+    parameterTypes[parameterIndex - 1] = Types.VARCHAR;
+    this.parameters.put(parameterIndex, "'" + dateStr + "'");
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x, Calendar cal) throws 
SQLException {
+    setDate(parameterIndex, x);
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x) throws SQLException {
+    checkParameterIndex(parameterIndex);
+    try {
+      long time = x.getTime();
+      String timeprecision = client.getProperties().getTimestampPrecision();
+      switch (timeprecision.toLowerCase()) {
+        case "ms":
+          break;
+        case "us":
+          time = time * 1000;
+          break;
+        case "ns":
+          time = time * 1000000;
+          break;
+        default:
+          break;
+      }
+      parameterValues[parameterIndex - 1] = time;
+      parameterTypes[parameterIndex - 1] = Types.BIGINT;
+      this.parameters.put(parameterIndex, Long.toString(time));
+    } catch (TException e) {
+      throw new SQLException("Failed to get time precision: " + 
e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x, Calendar cal) throws 
SQLException {
+    setTime(parameterIndex, x);
+  }
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x) throws 
SQLException {
+    checkParameterIndex(parameterIndex);
+    ZonedDateTime zonedDateTime =
+        ZonedDateTime.ofInstant(Instant.ofEpochMilli(x.getTime()), 
super.zoneId);
+    String tsStr = zonedDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+    parameterValues[parameterIndex - 1] = tsStr;
+    parameterTypes[parameterIndex - 1] = Types.VARCHAR;
+    this.parameters.put(parameterIndex, tsStr);
+  }
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) 
throws SQLException {
+    setTimestamp(parameterIndex, x);
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x) throws SQLException {
+    if (x == null) {
+      setNull(parameterIndex, Types.NULL);
+    } else if (x instanceof String) {
+      setString(parameterIndex, (String) x);
+    } else if (x instanceof Integer) {
+      setInt(parameterIndex, (Integer) x);
+    } else if (x instanceof Long) {
+      setLong(parameterIndex, (Long) x);
+    } else if (x instanceof Float) {
+      setFloat(parameterIndex, (Float) x);
+    } else if (x instanceof Double) {
+      setDouble(parameterIndex, (Double) x);
+    } else if (x instanceof Boolean) {
+      setBoolean(parameterIndex, (Boolean) x);
+    } else if (x instanceof Timestamp) {
+      setTimestamp(parameterIndex, (Timestamp) x);
+    } else if (x instanceof Date) {
+      setDate(parameterIndex, (Date) x);
+    } else if (x instanceof Time) {
+      setTime(parameterIndex, (Time) x);
+    } else if (x instanceof byte[]) {
+      setBytes(parameterIndex, (byte[]) x);
+    } else {
+      throw new SQLException(
+          String.format(
+              "Can't infer the SQL type for an instance of %s. Use setObject() 
with explicit type.",
+              x.getClass().getName()));
+    }
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x, int targetSqlType) 
throws SQLException {
+    setObject(parameterIndex, x);
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object parameterObj, int 
targetSqlType, int scale)
+      throws SQLException {
+    setObject(parameterIndex, parameterObj);
+  }
+
+  private void checkParameterIndex(int index) throws SQLException {
+    if (index < 1 || index > parameterCount) {
+      throw new SQLException(
+          "Parameter index out of range: " + index + " (expected 1-" + 
parameterCount + ")");
+    }
+  }
+
+  private String escapeSingleQuotes(String value) {
+    return value.replace("'", "''");
+  }
+
+  // ================== Unsupported Methods ==================
+
+  @Override
+  public void setArray(int parameterIndex, Array x) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x, int length) 
throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x, long length) 
throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setBigDecimal(int parameterIndex, BigDecimal x) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x, int length) 
throws SQLException {
+    try {
+      byte[] bytes = ReadWriteIOUtils.readBytes(x, length);
+      setBytes(parameterIndex, bytes);
+    } catch (IOException e) {
+      throw new SQLException("Failed to read binary stream: " + 
e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x, long length) 
throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setBlob(int parameterIndex, Blob x) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setBlob(int parameterIndex, InputStream inputStream) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setBlob(int parameterIndex, InputStream inputStream, long length)
+      throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setByte(int parameterIndex, byte x) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader, int length)
+      throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader, long 
length)
+      throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setClob(int parameterIndex, Clob x) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setClob(int parameterIndex, Reader reader) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setClob(int parameterIndex, Reader reader, long length) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setNCharacterStream(int parameterIndex, Reader value) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setNCharacterStream(int parameterIndex, Reader value, long 
length)
+      throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setNClob(int parameterIndex, NClob value) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setNClob(int parameterIndex, Reader reader, long length) throws 
SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setNString(int parameterIndex, String value) throws SQLException 
{
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setRef(int parameterIndex, Ref x) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setRowId(int parameterIndex, RowId x) throws SQLException {
+    throw new SQLException(METHOD_NOT_SUPPORTED_STRING);
+  }
+
+  @Override
+  public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws 
SQLException {
+    throw new SQLException(METHOD_NOT_SUPPORTED_STRING);
+  }
+
+  @Override
+  public void setShort(int parameterIndex, short x) throws SQLException {
+    setInt(parameterIndex, x);
+  }
+
+  @Override
+  public void setURL(int parameterIndex, URL x) throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  @Override
+  public void setUnicodeStream(int parameterIndex, InputStream x, int length) 
throws SQLException {
+    throw new SQLException(Constant.PARAMETER_SUPPORTED);
+  }
+
+  // ================== Helper Methods for Backward Compatibility 
==================
+
+  private String createCompleteSql(final String sql, Map<Integer, String> 
parameters)
+      throws SQLException {
+    List<String> parts = splitSqlStatement(sql);
+
+    StringBuilder newSql = new StringBuilder(parts.get(0));
+    for (int i = 1; i < parts.size(); i++) {
+      if (!parameters.containsKey(i)) {
+        throw new SQLException("Parameter #" + i + " is unset");
+      }
+      newSql.append(parameters.get(i));
+      newSql.append(parts.get(i));
+    }
+    return newSql.toString();
+  }
+
+  private List<String> splitSqlStatement(final String sql) {
+    List<String> parts = new ArrayList<>();
+    int apCount = 0;
+    int off = 0;
+    boolean skip = false;
+
+    for (int i = 0; i < sql.length(); i++) {
+      char c = sql.charAt(i);
+      if (skip) {
+        skip = false;
+        continue;
+      }
+      switch (c) {
+        case '\'':
+          apCount++;
+          break;
+        case '\\':
+          skip = true;
+          break;
+        case '?':
+          if ((apCount & 1) == 0) {
+            parts.add(sql.substring(off, i));
+            off = i + 1;
+          }
+          break;
+        default:
+          break;
+      }
+    }
+    parts.add(sql.substring(off));
+    return parts;
+  }
+}

Review Comment:
   The new IoTDBTablePreparedStatement class and the RPC methods 
(prepareStatement, executePreparedStatement, deallocatePreparedStatement) lack 
integration tests. While unit tests exist for the client-side implementation 
and integration tests exist for the SQL PREPARE/EXECUTE syntax, there are no 
tests that exercise the JDBC PreparedStatement API end-to-end. Consider adding 
integration tests that use the JDBC API to prepare and execute statements with 
parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to