JackieTien97 commented on code in PR #17027: URL: https://github.com/apache/iotdb/pull/17027#discussion_r2752367539
########## iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java: ########## @@ -0,0 +1,739 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.jdbc; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer; +import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface; +import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq; +import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSPrepareReq; +import org.apache.iotdb.service.rpc.thrift.TSPrepareResp; + +import org.apache.thrift.TException; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.nio.charset.Charset; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IoTDBTablePreparedStatement extends IoTDBStatement implements PreparedStatement { + + private static final Logger logger = LoggerFactory.getLogger(IoTDBTablePreparedStatement.class); + private static final String METHOD_NOT_SUPPORTED_STRING = "Method not supported"; + + private final String sql; + private final String preparedStatementName; + private final int parameterCount; + private final boolean serverSidePrepared; + + private final Object[] parameterValues; + private final int[] parameterTypes; + + // retain parameters for backward compatibility + private final Map<Integer, String> parameters = new HashMap<>(); + + IoTDBTablePreparedStatement( + IoTDBConnection connection, + Iface client, + Long sessionId, + String sql, + ZoneId zoneId, + Charset charset) + throws SQLException { + super(connection, client, sessionId, zoneId, charset); + this.sql = sql; + this.preparedStatementName = generateStatementName(); + + if (isQueryStatement(sql)) { + // Send PREPARE request to server only for query statements + this.serverSidePrepared = true; + TSPrepareReq prepareReq = new TSPrepareReq(); + prepareReq.setSessionId(sessionId); + prepareReq.setSql(sql); + prepareReq.setStatementName(preparedStatementName); + + try { + TSPrepareResp resp = client.prepareStatement(prepareReq); + RpcUtils.verifySuccess(resp.getStatus()); + + this.parameterCount = resp.isSetParameterCount() ? resp.getParameterCount() : 0; + this.parameterValues = new Object[parameterCount]; + this.parameterTypes = new int[parameterCount]; + + for (int i = 0; i < parameterCount; i++) { + parameterTypes[i] = Types.NULL; + } + } catch (TException | StatementExecutionException e) { + throw new SQLException("Failed to prepare statement: " + e.getMessage(), e); + } + } else { + // For non-query statements, only keep text parameters for client-side substitution. + this.serverSidePrepared = false; + this.parameterCount = 0; + this.parameterValues = null; + this.parameterTypes = null; + } + } + + // Only for tests + IoTDBTablePreparedStatement( + IoTDBConnection connection, Iface client, Long sessionId, String sql, ZoneId zoneId) + throws SQLException { + this(connection, client, sessionId, sql, zoneId, TSFileConfig.STRING_CHARSET); + } + + private String generateStatementName() { + // StatementId is unique across all sessions in one IoTDB instance + return "jdbc_ps_" + getStmtId(); + } + + @Override + public void addBatch() throws SQLException { + super.addBatch(createCompleteSql(sql, parameters)); + } + + @Override + public void clearParameters() { + this.parameters.clear(); + if (serverSidePrepared) { + for (int i = 0; i < parameterCount; i++) { + parameterValues[i] = null; + parameterTypes[i] = Types.NULL; + } + } + } + + @Override + public boolean execute() throws SQLException { + if (isQueryStatement(sql)) { + TSExecuteStatementResp resp = executeInternal(); + return resp.isSetQueryDataSet() || resp.isSetQueryResult(); + } else { + return super.execute(createCompleteSql(sql, parameters)); + } + } + + private boolean isQueryStatement(String sql) { + if (sql == null) { + return false; + } + String trimmedSql = sql.trim().toUpperCase(); + return trimmedSql.startsWith("SELECT"); Review Comment: Not right, we also support with clause in iotdb now ########## iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift: ########## @@ -167,6 +167,34 @@ struct TSCloseOperationReq { 4: optional string preparedStatementName } +// PREPARE +struct TSPrepareReq { + 1: required i64 sessionId + 2: required string sql + 3: required string statementName +} + +struct TSPrepareResp { + 1: required common.TSStatus status + 2: optional i32 parameterCount Review Comment: ```suggestion 3: optional i32 datanode_id ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement = + relationSqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (statement == null) { + return new TSPrepareResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse SQL: " + sql)); + } + + int parameterCount = ParameterExtractor.getParameterCount(statement); + + PreparedStatementHelper.register(clientSession, statementName, statement); + + TSPrepareResp resp = new TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setParameterCount(parameterCount); + return resp; + } catch (Exception e) { + return new TSPrepareResp( + onQueryException( + e, OperationType.PREPARE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq req) { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + + long startTime = System.nanoTime(); + Throwable t = null; + try { + String statementName = req.getStatementName(); + + List<DeserializedParam> rawParams = + PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters())); + List<Literal> parameters = new ArrayList<>(rawParams.size()); + for (DeserializedParam param : rawParams) { + parameters.add(convertToLiteral(param)); + } + + Execute executeStatement = new Execute(new Identifier(statementName), parameters); + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.getStatementId()); + + long timeout = req.isSetTimeout() ? req.getTimeout() : config.getQueryTimeoutThreshold(); + ExecutionResult result = + COORDINATOR.executeForTableModel( + executeStatement, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "EXECUTE " + statementName, + metadata, + timeout, + true); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + finished = true; + return RpcUtils.getTSExecuteStatementResp(result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSExecuteStatementResp resp; + if (queryExecution != null && queryExecution.isQuery()) { + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(result.status); + int fetchSize = + req.isSetFetchSize() ? req.getFetchSize() : config.getThriftMaxFrameSize(); + finished = setResultForPrepared.apply(resp, queryExecution, fetchSize); + resp.setMoreData(!finished); Review Comment: ```suggestion resp.setMoreData(!finished); if (quota != null) { quota.addReadResult(resp.getQueryResult()); } // Should return SUCCESS_MESSAGE for insert into query if (queryExecution.getQueryType() == QueryType.READ_WRITE) { resp.setColumns(null); } ``` add quota check ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementHelper.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +/** Helper for prepared statement registration/unregistration. */ +public class PreparedStatementHelper { + + private PreparedStatementHelper() {} + + /** Registers a prepared statement in the session. */ + public static PreparedStatementInfo register( + IClientSession session, String statementName, Statement sql) { + if (session.getPreparedStatement(statementName) != null) { + throw new SemanticException( + String.format("Prepared statement '%s' already exists", statementName)); + } + + long memorySizeInBytes = sql == null ? 0L : sql.ramBytesUsed(); + + PreparedStatementMemoryManager.getInstance().allocate(statementName, memorySizeInBytes); + + PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, memorySizeInBytes); + session.addPreparedStatement(statementName, info); + + return info; + } + + /** Unregisters a prepared statement from the session. */ + public static PreparedStatementInfo unregister(IClientSession session, String statementName) { Review Comment: ```suggestion public static void unregister(IClientSession session, String statementName) { ``` return value is never used. ########## iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerializer.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc.stmt; + +import org.apache.tsfile.enums.TSDataType; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +/** Serializer for PreparedStatement parameters. */ +public class PreparedParameterSerializer { Review Comment: ```suggestion public class PreparedParameterSerDe { ``` Not only Serializer, but also Deserializer ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); Review Comment: ```suggestion String statementName = IoTDBDescriptor.getInstance().getConfig().getDataNodeId() + req.getStatementName(); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement = + relationSqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (statement == null) { + return new TSPrepareResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse SQL: " + sql)); + } + + int parameterCount = ParameterExtractor.getParameterCount(statement); + + PreparedStatementHelper.register(clientSession, statementName, statement); + + TSPrepareResp resp = new TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setParameterCount(parameterCount); Review Comment: ```suggestion resp.setParameterCount(parameterCount); resp.setDataNodeId(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); ``` ########## iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java: ########## @@ -0,0 +1,739 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.jdbc; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer; +import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface; +import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq; +import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSPrepareReq; +import org.apache.iotdb.service.rpc.thrift.TSPrepareResp; + +import org.apache.thrift.TException; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.nio.charset.Charset; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IoTDBTablePreparedStatement extends IoTDBStatement implements PreparedStatement { + + private static final Logger logger = LoggerFactory.getLogger(IoTDBTablePreparedStatement.class); + private static final String METHOD_NOT_SUPPORTED_STRING = "Method not supported"; + + private final String sql; + private final String preparedStatementName; + private final int parameterCount; + private final boolean serverSidePrepared; + + private final Object[] parameterValues; + private final int[] parameterTypes; + + // retain parameters for backward compatibility + private final Map<Integer, String> parameters = new HashMap<>(); + + IoTDBTablePreparedStatement( + IoTDBConnection connection, + Iface client, + Long sessionId, + String sql, + ZoneId zoneId, + Charset charset) + throws SQLException { + super(connection, client, sessionId, zoneId, charset); + this.sql = sql; + this.preparedStatementName = generateStatementName(); + + if (isQueryStatement(sql)) { + // Send PREPARE request to server only for query statements + this.serverSidePrepared = true; + TSPrepareReq prepareReq = new TSPrepareReq(); + prepareReq.setSessionId(sessionId); + prepareReq.setSql(sql); + prepareReq.setStatementName(preparedStatementName); + + try { + TSPrepareResp resp = client.prepareStatement(prepareReq); + RpcUtils.verifySuccess(resp.getStatus()); + + this.parameterCount = resp.isSetParameterCount() ? resp.getParameterCount() : 0; + this.parameterValues = new Object[parameterCount]; + this.parameterTypes = new int[parameterCount]; + + for (int i = 0; i < parameterCount; i++) { + parameterTypes[i] = Types.NULL; + } + } catch (TException | StatementExecutionException e) { + throw new SQLException("Failed to prepare statement: " + e.getMessage(), e); + } + } else { + // For non-query statements, only keep text parameters for client-side substitution. + this.serverSidePrepared = false; + this.parameterCount = 0; + this.parameterValues = null; + this.parameterTypes = null; + } + } + + // Only for tests + IoTDBTablePreparedStatement( + IoTDBConnection connection, Iface client, Long sessionId, String sql, ZoneId zoneId) + throws SQLException { + this(connection, client, sessionId, sql, zoneId, TSFileConfig.STRING_CHARSET); + } + + private String generateStatementName() { + // StatementId is unique across all sessions in one IoTDB instance + return "jdbc_ps_" + getStmtId(); Review Comment: in this way, it's not always unique across all sessions in one IoTDB instance, you should add a datanodeid_ prefix. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement = + relationSqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (statement == null) { + return new TSPrepareResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse SQL: " + sql)); + } + + int parameterCount = ParameterExtractor.getParameterCount(statement); + + PreparedStatementHelper.register(clientSession, statementName, statement); + + TSPrepareResp resp = new TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setParameterCount(parameterCount); + return resp; + } catch (Exception e) { + return new TSPrepareResp( + onQueryException( + e, OperationType.PREPARE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq req) { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + + long startTime = System.nanoTime(); + Throwable t = null; + try { + String statementName = req.getStatementName(); + + List<DeserializedParam> rawParams = + PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters())); + List<Literal> parameters = new ArrayList<>(rawParams.size()); + for (DeserializedParam param : rawParams) { + parameters.add(convertToLiteral(param)); + } + + Execute executeStatement = new Execute(new Identifier(statementName), parameters); + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.getStatementId()); + + long timeout = req.isSetTimeout() ? req.getTimeout() : config.getQueryTimeoutThreshold(); + ExecutionResult result = + COORDINATOR.executeForTableModel( + executeStatement, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "EXECUTE " + statementName, + metadata, + timeout, + true); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + finished = true; + return RpcUtils.getTSExecuteStatementResp(result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSExecuteStatementResp resp; + if (queryExecution != null && queryExecution.isQuery()) { + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(result.status); + int fetchSize = + req.isSetFetchSize() ? req.getFetchSize() : config.getThriftMaxFrameSize(); + finished = setResultForPrepared.apply(resp, queryExecution, fetchSize); + resp.setMoreData(!finished); + } else { + finished = true; + resp = RpcUtils.getTSExecuteStatementResp(result.status); + } + return resp; + } + } catch (Exception e) { + finished = true; + t = e; + return RpcUtils.getTSExecuteStatementResp( + onQueryException( + e, + OperationType.EXECUTE_PREPARED_STATEMENT.getName(), + TSStatusCode.INTERNAL_SERVER_ERROR)); + } finally { Review Comment: catch error here, do exactly what executeStatementInternal does ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement = + relationSqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (statement == null) { + return new TSPrepareResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse SQL: " + sql)); + } + + int parameterCount = ParameterExtractor.getParameterCount(statement); + + PreparedStatementHelper.register(clientSession, statementName, statement); + + TSPrepareResp resp = new TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setParameterCount(parameterCount); + return resp; + } catch (Exception e) { + return new TSPrepareResp( + onQueryException( + e, OperationType.PREPARE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq req) { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + + long startTime = System.nanoTime(); + Throwable t = null; + try { + String statementName = req.getStatementName(); + + List<DeserializedParam> rawParams = + PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters())); + List<Literal> parameters = new ArrayList<>(rawParams.size()); + for (DeserializedParam param : rawParams) { + parameters.add(convertToLiteral(param)); + } + + Execute executeStatement = new Execute(new Identifier(statementName), parameters); + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.getStatementId()); + + long timeout = req.isSetTimeout() ? req.getTimeout() : config.getQueryTimeoutThreshold(); + ExecutionResult result = + COORDINATOR.executeForTableModel( + executeStatement, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "EXECUTE " + statementName, + metadata, + timeout, + true); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + finished = true; + return RpcUtils.getTSExecuteStatementResp(result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSExecuteStatementResp resp; + if (queryExecution != null && queryExecution.isQuery()) { + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(result.status); + int fetchSize = + req.isSetFetchSize() ? req.getFetchSize() : config.getThriftMaxFrameSize(); + finished = setResultForPrepared.apply(resp, queryExecution, fetchSize); + resp.setMoreData(!finished); + } else { + finished = true; + resp = RpcUtils.getTSExecuteStatementResp(result.status); + } + return resp; + } + } catch (Exception e) { + finished = true; + t = e; + return RpcUtils.getTSExecuteStatementResp( + onQueryException( + e, + OperationType.EXECUTE_PREPARED_STATEMENT.getName(), + TSStatusCode.INTERNAL_SERVER_ERROR)); + } finally { + long currentOperationCost = System.nanoTime() - startTime; + if (finished) { + COORDINATOR.cleanupQueryExecution(queryId, null, t); + } + COORDINATOR.recordExecutionTime(queryId, currentOperationCost); + } + } + + @Override + public TSStatus deallocatePreparedStatement(TSDeallocatePreparedReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return getNotLoggedInStatus(); + } + + try { + PreparedStatementHelper.unregister(clientSession, req.getStatementName()); + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } catch (Exception e) { + return onQueryException( + e, + OperationType.DEALLOCATE_PREPARED_STATEMENT.getName(), + TSStatusCode.INTERNAL_SERVER_ERROR); + } + } + + private Literal convertToLiteral(DeserializedParam param) { + if (param.isNull()) { + return new NullLiteral(); + } + + switch (param.type) { + case BOOLEAN: + return new BooleanLiteral((Boolean) param.value ? "true" : "false"); Review Comment: ```suggestion return new BooleanLiteral((boolean) param.value ? "true" : "false"); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement = + relationSqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (statement == null) { + return new TSPrepareResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse SQL: " + sql)); + } + + int parameterCount = ParameterExtractor.getParameterCount(statement); + + PreparedStatementHelper.register(clientSession, statementName, statement); + + TSPrepareResp resp = new TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setParameterCount(parameterCount); + return resp; + } catch (Exception e) { + return new TSPrepareResp( + onQueryException( + e, OperationType.PREPARE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq req) { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + + long startTime = System.nanoTime(); + Throwable t = null; + try { + String statementName = req.getStatementName(); + + List<DeserializedParam> rawParams = + PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters())); + List<Literal> parameters = new ArrayList<>(rawParams.size()); + for (DeserializedParam param : rawParams) { + parameters.add(convertToLiteral(param)); + } + + Execute executeStatement = new Execute(new Identifier(statementName), parameters); + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.getStatementId()); + + long timeout = req.isSetTimeout() ? req.getTimeout() : config.getQueryTimeoutThreshold(); + ExecutionResult result = + COORDINATOR.executeForTableModel( + executeStatement, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "EXECUTE " + statementName, Review Comment: not enough? should contain all the constants? You can return it in `convertToLiteral` method. ########## iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerializer.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc.stmt; + +import org.apache.tsfile.enums.TSDataType; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +/** Serializer for PreparedStatement parameters. */ +public class PreparedParameterSerializer { + + public static class DeserializedParam { + public final TSDataType type; + public final Object value; + + DeserializedParam(TSDataType type, Object value) { + this.type = type; + this.value = value; + } + + public boolean isNull() { + return type == TSDataType.UNKNOWN || value == null; + } + } + + private PreparedParameterSerializer() {} + + /** Serialize parameters to binary format. */ + public static ByteBuffer serialize(Object[] values, int[] jdbcTypes, int count) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + dos.writeInt(count); + for (int i = 0; i < count; i++) { + serializeParameter(dos, values[i], jdbcTypes[i]); + } + + dos.flush(); + return ByteBuffer.wrap(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize parameters", e); + } + } + + private static void serializeParameter(DataOutputStream dos, Object value, int jdbcType) + throws IOException { + if (value == null || jdbcType == Types.NULL) { + dos.writeByte(TSDataType.UNKNOWN.serialize()); + return; + } + + switch (jdbcType) { + case Types.BOOLEAN: + dos.writeByte(TSDataType.BOOLEAN.serialize()); + dos.writeByte((Boolean) value ? 1 : 0); Review Comment: use `ReadWriteIOUtils.write()` and `ReadWriteIOUtils.read(XXX)` instead of defining your own serde way. ########## integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBTablePreparedStatementJDBCIT.java: ########## Review Comment: add test case for each supported data type defined in TsDataType. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java: ########## @@ -1488,6 +1505,174 @@ public TSStatus closeOperation(TSCloseOperationReq req) { COORDINATOR::cleanupQueryExecution); } + // ========================= PreparedStatement RPC Methods ========================= + + @Override + public TSPrepareResp prepareStatement(TSPrepareReq req) { + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSPrepareResp(getNotLoggedInStatus()); + } + + try { + String sql = req.getSql(); + String statementName = req.getStatementName(); + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement = + relationSqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (statement == null) { + return new TSPrepareResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, "Failed to parse SQL: " + sql)); + } + + int parameterCount = ParameterExtractor.getParameterCount(statement); + + PreparedStatementHelper.register(clientSession, statementName, statement); + + TSPrepareResp resp = new TSPrepareResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setParameterCount(parameterCount); + return resp; + } catch (Exception e) { + return new TSPrepareResp( + onQueryException( + e, OperationType.PREPARE_STATEMENT.getName(), TSStatusCode.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public TSExecuteStatementResp executePreparedStatement(TSExecutePreparedReq req) { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + + long startTime = System.nanoTime(); + Throwable t = null; + try { + String statementName = req.getStatementName(); + + List<DeserializedParam> rawParams = + PreparedParameterSerializer.deserialize(ByteBuffer.wrap(req.getParameters())); + List<Literal> parameters = new ArrayList<>(rawParams.size()); + for (DeserializedParam param : rawParams) { + parameters.add(convertToLiteral(param)); + } + + Execute executeStatement = new Execute(new Identifier(statementName), parameters); + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.getStatementId()); + + long timeout = req.isSetTimeout() ? req.getTimeout() : config.getQueryTimeoutThreshold(); + ExecutionResult result = + COORDINATOR.executeForTableModel( + executeStatement, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "EXECUTE " + statementName, + metadata, + timeout, + true); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + finished = true; + return RpcUtils.getTSExecuteStatementResp(result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSExecuteStatementResp resp; + if (queryExecution != null && queryExecution.isQuery()) { + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(result.status); + int fetchSize = + req.isSetFetchSize() ? req.getFetchSize() : config.getThriftMaxFrameSize(); + finished = setResultForPrepared.apply(resp, queryExecution, fetchSize); + resp.setMoreData(!finished); + } else { + finished = true; + resp = RpcUtils.getTSExecuteStatementResp(result.status); + } + return resp; + } + } catch (Exception e) { + finished = true; + t = e; + return RpcUtils.getTSExecuteStatementResp( + onQueryException( + e, + OperationType.EXECUTE_PREPARED_STATEMENT.getName(), + TSStatusCode.INTERNAL_SERVER_ERROR)); + } finally { + long currentOperationCost = System.nanoTime() - startTime; + if (finished) { + COORDINATOR.cleanupQueryExecution(queryId, null, t); + } + COORDINATOR.recordExecutionTime(queryId, currentOperationCost); Review Comment: why don't you follow the `executeStatementInternal` way? ########## iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java: ########## @@ -0,0 +1,739 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.jdbc; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.stmt.PreparedParameterSerializer; +import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface; +import org.apache.iotdb.service.rpc.thrift.TSDeallocatePreparedReq; +import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSPrepareReq; +import org.apache.iotdb.service.rpc.thrift.TSPrepareResp; + +import org.apache.thrift.TException; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.nio.charset.Charset; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IoTDBTablePreparedStatement extends IoTDBStatement implements PreparedStatement { + + private static final Logger logger = LoggerFactory.getLogger(IoTDBTablePreparedStatement.class); + private static final String METHOD_NOT_SUPPORTED_STRING = "Method not supported"; + + private final String sql; + private final String preparedStatementName; + private final int parameterCount; + private final boolean serverSidePrepared; + + private final Object[] parameterValues; + private final int[] parameterTypes; + + // retain parameters for backward compatibility + private final Map<Integer, String> parameters = new HashMap<>(); + + IoTDBTablePreparedStatement( + IoTDBConnection connection, + Iface client, + Long sessionId, + String sql, + ZoneId zoneId, + Charset charset) + throws SQLException { + super(connection, client, sessionId, zoneId, charset); + this.sql = sql; + this.preparedStatementName = generateStatementName(); + + if (isQueryStatement(sql)) { + // Send PREPARE request to server only for query statements + this.serverSidePrepared = true; + TSPrepareReq prepareReq = new TSPrepareReq(); + prepareReq.setSessionId(sessionId); + prepareReq.setSql(sql); + prepareReq.setStatementName(preparedStatementName); + + try { + TSPrepareResp resp = client.prepareStatement(prepareReq); + RpcUtils.verifySuccess(resp.getStatus()); + + this.parameterCount = resp.isSetParameterCount() ? resp.getParameterCount() : 0; + this.parameterValues = new Object[parameterCount]; + this.parameterTypes = new int[parameterCount]; + + for (int i = 0; i < parameterCount; i++) { + parameterTypes[i] = Types.NULL; + } + } catch (TException | StatementExecutionException e) { + throw new SQLException("Failed to prepare statement: " + e.getMessage(), e); + } + } else { + // For non-query statements, only keep text parameters for client-side substitution. + this.serverSidePrepared = false; + this.parameterCount = 0; + this.parameterValues = null; + this.parameterTypes = null; + } + } + + // Only for tests + IoTDBTablePreparedStatement( + IoTDBConnection connection, Iface client, Long sessionId, String sql, ZoneId zoneId) + throws SQLException { + this(connection, client, sessionId, sql, zoneId, TSFileConfig.STRING_CHARSET); + } + + private String generateStatementName() { + // StatementId is unique across all sessions in one IoTDB instance + return "jdbc_ps_" + getStmtId(); + } + + @Override + public void addBatch() throws SQLException { + super.addBatch(createCompleteSql(sql, parameters)); + } + + @Override + public void clearParameters() { + this.parameters.clear(); + if (serverSidePrepared) { + for (int i = 0; i < parameterCount; i++) { + parameterValues[i] = null; + parameterTypes[i] = Types.NULL; + } + } + } + + @Override + public boolean execute() throws SQLException { + if (isQueryStatement(sql)) { + TSExecuteStatementResp resp = executeInternal(); + return resp.isSetQueryDataSet() || resp.isSetQueryResult(); + } else { + return super.execute(createCompleteSql(sql, parameters)); + } + } + + private boolean isQueryStatement(String sql) { + if (sql == null) { + return false; + } + String trimmedSql = sql.trim().toUpperCase(); + return trimmedSql.startsWith("SELECT"); Review Comment: You should catch specific exception while calling `client.prepareStatement` method, depending on the server resp to decide whether this is a non-query sql, and then degrade to previous client-side way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
