This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ExecuteBatchStatement
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a016db16976bddf9e1c915df17b03c69004c3b96
Author: JackieTien97 <jackietie...@gmail.com>
AuthorDate: Tue Jul 23 09:40:59 2024 +0800

    Support executeBatch in Table Model
---
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  11 +++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 105 +++++++++++++++------
 3 files changed, 88 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index ad37ca271a9..e7e045252e3 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -407,6 +407,17 @@ public class IoTDBStatement implements Statement {
         message.append(execResp.getMessage());
       }
     }
+    if (execResp.isSetSubStatus() && execResp.getSubStatus() != null) {
+      for (TSStatus status : execResp.getSubStatus()) {
+        if (status.getCode() == TSStatusCode.USE_DB.getStatusCode()
+            && status.isSetMessage()
+            && status.getMessage() != null
+            && !status.getMessage().isEmpty()) {
+          connection.changeDefaultDatabase(status.getMessage());
+          break;
+        }
+      }
+    }
     if (!allSuccess) {
       throw new BatchUpdateException(message.toString(), result);
     }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d41b247cdd2..6ecbb3061b2 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -53,6 +53,8 @@ public enum TSStatusCode {
   // Client,
   REDIRECTION_RECOMMEND(400),
 
+  USE_DB(401),
+
   // Schema Engine
   DATABASE_NOT_EXIST(500),
   DATABASE_ALREADY_EXISTS(501),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 83cf79a5c68..94a683e1165 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -1624,6 +1624,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       return getNotLoggedInStatus();
     }
 
+    boolean useDatabase = false;
     try {
       for (int i = 0; i < req.getStatements().size(); i++) {
         String statement = req.getStatements().get(i);
@@ -1631,37 +1632,72 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         String type = null;
         OperationQuota quota = null;
         try {
-          Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
-          if (s == null) {
-            return RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is 
not supported");
-          }
-          // permission check
-          TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
-          if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) 
{
-            return status;
-          }
+          long queryId;
+          ExecutionResult result;
+          if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) 
{
+            Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
+            if (s == null) {
+              return RpcUtils.getStatus(
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type 
is not supported");
+            }
+            // permission check
+            TSStatus status = AuthorityChecker.checkAuthority(s, 
clientSession);
+            if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              return status;
+            }
 
-          quota =
-              DataNodeThrottleQuotaManager.getInstance()
-                  .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
s);
+            quota =
+                DataNodeThrottleQuotaManager.getInstance()
+                    
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
 
-          if (ENABLE_AUDIT_LOG) {
-            AuditLogger.log(statement, s);
+            if (ENABLE_AUDIT_LOG) {
+              AuditLogger.log(statement, s);
+            }
+
+            queryId = SESSION_MANAGER.requestQueryId();
+            type = s.getType() == null ? null : s.getType().name();
+            // create and cache dataset
+            result =
+                COORDINATOR.executeForTreeModel(
+                    s,
+                    queryId,
+                    SESSION_MANAGER.getSessionInfo(clientSession),
+                    statement,
+                    partitionFetcher,
+                    schemaFetcher,
+                    config.getQueryTimeoutThreshold());
+          } else {
+
+            org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
s =
+                relationSqlParser.createStatement(statement, 
clientSession.getZoneId());
+
+            if (s instanceof Use) {
+              useDatabase = true;
+            }
+
+            if (s == null) {
+              return RpcUtils.getStatus(
+                  TSStatusCode.SQL_PARSE_ERROR, "This operation type is not 
supported");
+            }
+
+            // TODO: permission check
+
+            // TODO audit log, quota, StatementType
+
+            queryId = SESSION_MANAGER.requestQueryId();
+
+            result =
+                COORDINATOR.executeForTableModel(
+                    s,
+                    relationSqlParser,
+                    clientSession,
+                    queryId,
+                    SESSION_MANAGER.getSessionInfo(clientSession),
+                    statement,
+                    metadata,
+                    config.getQueryTimeoutThreshold());
           }
 
-          long queryId = SESSION_MANAGER.requestQueryId();
-          type = s.getType() == null ? null : s.getType().name();
-          // create and cache dataset
-          ExecutionResult result =
-              COORDINATOR.executeForTreeModel(
-                  s,
-                  queryId,
-                  SESSION_MANAGER.getSessionInfo(clientSession),
-                  statement,
-                  partitionFetcher,
-                  schemaFetcher,
-                  config.getQueryTimeoutThreshold());
           results.add(result.status);
         } catch (Exception e) {
           LOGGER.warn("Error occurred when executing executeBatchStatement: ", 
e);
@@ -1685,9 +1721,18 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), 
System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
     }
-    return isAllSuccessful
-        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch 
statements successfully")
-        : RpcUtils.getStatus(results);
+
+    if (isAllSuccessful) {
+      TSStatus res =
+          RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute batch 
statements successfully");
+      if (useDatabase) {
+        TSStatus useDB = RpcUtils.getStatus(TSStatusCode.USE_DB, 
clientSession.getDatabaseName());
+        res.setSubStatus(Collections.singletonList(useDB));
+      }
+      return res;
+    } else {
+      return RpcUtils.getStatus(results);
+    }
   }
 
   @Override

Reply via email to