This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ExecuteBatchStatement in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a016db16976bddf9e1c915df17b03c69004c3b96 Author: JackieTien97 <jackietie...@gmail.com> AuthorDate: Tue Jul 23 09:40:59 2024 +0800 Support executeBatch in Table Model --- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 11 +++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../protocol/thrift/impl/ClientRPCServiceImpl.java | 105 +++++++++++++++------ 3 files changed, 88 insertions(+), 30 deletions(-) diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java index ad37ca271a9..e7e045252e3 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java @@ -407,6 +407,17 @@ public class IoTDBStatement implements Statement { message.append(execResp.getMessage()); } } + if (execResp.isSetSubStatus() && execResp.getSubStatus() != null) { + for (TSStatus status : execResp.getSubStatus()) { + if (status.getCode() == TSStatusCode.USE_DB.getStatusCode() + && status.isSetMessage() + && status.getMessage() != null + && !status.getMessage().isEmpty()) { + connection.changeDefaultDatabase(status.getMessage()); + break; + } + } + } if (!allSuccess) { throw new BatchUpdateException(message.toString(), result); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index d41b247cdd2..6ecbb3061b2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -53,6 +53,8 @@ public enum TSStatusCode { // Client, REDIRECTION_RECOMMEND(400), + USE_DB(401), + // Schema Engine DATABASE_NOT_EXIST(500), DATABASE_ALREADY_EXISTS(501), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 83cf79a5c68..94a683e1165 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1624,6 +1624,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return getNotLoggedInStatus(); } + boolean useDatabase = false; try { for (int i = 0; i < req.getStatements().size(); i++) { String statement = req.getStatements().get(i); @@ -1631,37 +1632,72 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { String type = null; OperationQuota quota = null; try { - Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); - if (s == null) { - return RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"); - } - // permission check - TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return status; - } + long queryId; + ExecutionResult result; + if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) { + Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); + if (s == null) { + return RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported"); + } + // permission check + TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } - quota = - DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + quota = + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); - if (ENABLE_AUDIT_LOG) { - AuditLogger.log(statement, s); + if (ENABLE_AUDIT_LOG) { + AuditLogger.log(statement, s); + } + + queryId = SESSION_MANAGER.requestQueryId(); + type = s.getType() == null ? null : s.getType().name(); + // create and cache dataset + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold()); + } else { + + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s = + relationSqlParser.createStatement(statement, clientSession.getZoneId()); + + if (s instanceof Use) { + useDatabase = true; + } + + if (s == null) { + return RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"); + } + + // TODO: permission check + + // TODO audit log, quota, StatementType + + queryId = SESSION_MANAGER.requestQueryId(); + + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold()); } - long queryId = SESSION_MANAGER.requestQueryId(); - type = s.getType() == null ? null : s.getType().name(); - // create and cache dataset - ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold()); results.add(result.status); } catch (Exception e) { LOGGER.warn("Error occurred when executing executeBatchStatement: ", e); @@ -1685,9 +1721,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); } - return isAllSuccessful - ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully") - : RpcUtils.getStatus(results); + + if (isAllSuccessful) { + TSStatus res = + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch statements successfully"); + if (useDatabase) { + TSStatus useDB = RpcUtils.getStatus(TSStatusCode.USE_DB, clientSession.getDatabaseName()); + res.setSubStatus(Collections.singletonList(useDB)); + } + return res; + } else { + return RpcUtils.getStatus(results); + } } @Override