This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b8cf54b4d91 Speed up AINode IT: symlink built-in weights, batch 
inserts, trim concurrent loop (#17719)
b8cf54b4d91 is described below

commit b8cf54b4d9146830feecfec2e22a55cdd890dcbc
Author: Yongzao <[email protected]>
AuthorDate: Tue May 19 16:41:06 2026 +0800

    Speed up AINode IT: symlink built-in weights, batch inserts, trim 
concurrent loop (#17719)
---
 .../iotdb/it/env/cluster/node/AINodeWrapper.java   |  66 +++++----
 .../iotdb/ainode/it/AINodeClusterConfigIT.java     | 127 ----------------
 .../ainode/it/AINodeConcurrentForecastIT.java      | 120 ---------------
 .../iotdb/ainode/it/AINodeSharedClusterIT.java     | 164 ++++++++++++++++++++-
 .../apache/iotdb/ainode/utils/AINodeTestUtils.java |  21 ++-
 5 files changed, 221 insertions(+), 277 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java
index d452e19f381..91faeae8eee 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeWrapper.java
@@ -131,37 +131,53 @@ public class AINodeWrapper extends AbstractNodeWrapper {
           },
           propertiesFile);
 
-      // copy built-in LTSM
+      // Link built-in LTSM weights from the runner-wide cache. These can be 
hundreds of MB to
+      // multiple GB; copying them per fork dominates IT startup. Symlinks 
share read-only weights
+      // across forks; we fall back to a copy on platforms / filesystems that 
reject symlinks.
       String builtInModelPath = filePrefix + File.separator + 
BUILT_IN_MODEL_PATH;
-      new File(builtInModelPath).mkdirs();
+      File builtInModelDir = new File(builtInModelPath);
       try {
-        if (new File(builtInModelPath).exists()) {
-          PathUtils.deleteDirectory(Paths.get(builtInModelPath));
+        if (builtInModelDir.exists()) {
+          PathUtils.deleteDirectory(builtInModelDir.toPath());
         }
       } catch (NoSuchFileException e) {
         // ignored
       }
-      try (Stream<Path> s = Files.walk(Paths.get(CACHE_BUILT_IN_MODEL_PATH))) {
-        s.forEach(
-            source -> {
-              Path destination =
-                  Paths.get(
-                      builtInModelPath,
-                      
source.toString().substring(CACHE_BUILT_IN_MODEL_PATH.length()));
-              logger.info("AINode copying model weights from {} to {}", 
source, destination);
-              try {
-                Files.copy(
-                    source,
-                    destination,
-                    LinkOption.NOFOLLOW_LINKS,
-                    StandardCopyOption.COPY_ATTRIBUTES);
-              } catch (IOException e) {
-                logger.error("AINode got error copying model weights", e);
-                throw new RuntimeException(e);
-              }
-            });
-      } catch (Exception e) {
-        logger.error("AINode got error copying model weights", e);
+      Path cacheRoot = Paths.get(CACHE_BUILT_IN_MODEL_PATH);
+      Path destRoot = builtInModelDir.toPath();
+      builtInModelDir.getParentFile().mkdirs();
+      try {
+        Files.createSymbolicLink(destRoot, cacheRoot);
+        logger.info("AINode symlinked model weights {} -> {}", destRoot, 
cacheRoot);
+      } catch (UnsupportedOperationException | IOException symlinkErr) {
+        logger.warn(
+            "AINode failed to symlink {} -> {} ({}), falling back to copy",
+            destRoot,
+            cacheRoot,
+            symlinkErr.toString());
+        builtInModelDir.mkdirs();
+        try (Stream<Path> s = Files.walk(cacheRoot)) {
+          s.forEach(
+              source -> {
+                Path destination =
+                    Paths.get(
+                        builtInModelPath,
+                        
source.toString().substring(CACHE_BUILT_IN_MODEL_PATH.length()));
+                logger.info("AINode copying model weights from {} to {}", 
source, destination);
+                try {
+                  Files.copy(
+                      source,
+                      destination,
+                      LinkOption.NOFOLLOW_LINKS,
+                      StandardCopyOption.COPY_ATTRIBUTES);
+                } catch (IOException e) {
+                  logger.error("AINode got error copying model weights", e);
+                  throw new RuntimeException(e);
+                }
+              });
+        } catch (Exception e) {
+          logger.error("AINode got error copying model weights", e);
+        }
       }
 
       // start AINode
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
deleted file mode 100644
index de6d3c48be3..00000000000
--- 
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.ainode.it;
-
-import org.apache.iotdb.it.env.EnvFactory;
-import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.AIClusterIT;
-import org.apache.iotdb.itbase.env.BaseEnv;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader;
-import static org.junit.Assert.assertEquals;
-
-@RunWith(IoTDBTestRunner.class)
-@Category({AIClusterIT.class})
-public class AINodeClusterConfigIT {
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    EnvFactory.getEnv().initClusterEnvironment(1, 1);
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    EnvFactory.getEnv().cleanClusterEnvironment();
-  }
-
-  @Test
-  public void aiNodeRegisterAndRemoveTest() throws SQLException {
-    String show_sql = "SHOW AINODES";
-    String title = "NodeID,Status,InternalAddress,InternalPort";
-
-    // Verify AINode exists via both dialects before removal
-    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
-        Statement statement = connection.createStatement()) {
-      verifyAINodeExists(statement, show_sql, title);
-    }
-    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
-        Statement statement = connection.createStatement()) {
-      verifyAINodeExists(statement, show_sql, title);
-    }
-
-    // Remove AINode
-    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
-        Statement statement = connection.createStatement()) {
-      statement.execute("REMOVE AINODE");
-      waitForAINodeRemoval(statement, show_sql, title);
-    }
-
-    // Verify removal is visible via table dialect as well
-    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
-        Statement statement = connection.createStatement()) {
-      try (ResultSet resultSet = statement.executeQuery(show_sql)) {
-        checkHeader(resultSet.getMetaData(), title);
-        int count = 0;
-        while (resultSet.next()) {
-          count++;
-        }
-        assertEquals(0, count);
-      }
-    }
-  }
-
-  private static void verifyAINodeExists(Statement statement, String showSql, 
String title)
-      throws SQLException {
-    try (ResultSet resultSet = statement.executeQuery(showSql)) {
-      checkHeader(resultSet.getMetaData(), title);
-      int count = 0;
-      while (resultSet.next()) {
-        assertEquals("2", resultSet.getString(1));
-        assertEquals("Running", resultSet.getString(2));
-        count++;
-      }
-      assertEquals(1, count);
-    }
-  }
-
-  private static void waitForAINodeRemoval(Statement statement, String 
showSql, String title)
-      throws SQLException {
-    for (int retry = 0; retry < 500; retry++) {
-      try (ResultSet resultSet = statement.executeQuery(showSql)) {
-        checkHeader(resultSet.getMetaData(), title);
-        int count = 0;
-        while (resultSet.next()) {
-          count++;
-        }
-        if (count == 0) {
-          return;
-        }
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    Assert.fail("The target AINode is not removed successfully after all 
retries.");
-  }
-}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java
deleted file mode 100644
index fd021099d5f..00000000000
--- 
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentForecastIT.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.ainode.it;
-
-import org.apache.iotdb.ainode.utils.AINodeTestUtils;
-import org.apache.iotdb.it.env.EnvFactory;
-import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.AIClusterIT;
-import org.apache.iotdb.itbase.env.BaseEnv;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.List;
-
-import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelNotOnSpecifiedDevice;
-import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelOnSpecifiedDevice;
-import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.concurrentInference;
-
-@RunWith(IoTDBTestRunner.class)
-@Category({AIClusterIT.class})
-public class AINodeConcurrentForecastIT {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(AINodeConcurrentForecastIT.class);
-
-  private static final List<AINodeTestUtils.FakeModelInfo> MODEL_LIST =
-      Arrays.asList(
-          new AINodeTestUtils.FakeModelInfo("sundial", "sundial", "builtin", 
"active"),
-          new AINodeTestUtils.FakeModelInfo("timer_xl", "timer", "builtin", 
"active"));
-
-  private static final String FORECAST_TABLE_FUNCTION_SQL_TEMPLATE =
-      "SELECT * FROM FORECAST(model_id=>'%s', targets=>(SELECT time,s FROM 
root.AI) ORDER BY time, output_length=>%d)";
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    // Init 1C1D1A cluster environment
-    EnvFactory.getEnv().initClusterEnvironment(1, 1);
-    prepareDataForTableModel();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    EnvFactory.getEnv().cleanClusterEnvironment();
-  }
-
-  private static void prepareDataForTableModel() throws SQLException {
-    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
-        Statement statement = connection.createStatement()) {
-      statement.execute("CREATE DATABASE root");
-      statement.execute("CREATE TABLE root.AI (s DOUBLE FIELD)");
-      for (int i = 0; i < 2880; i++) {
-        statement.execute(
-            String.format(
-                "INSERT INTO root.AI(time, s) VALUES(%d, %f)", i, Math.sin(i * 
Math.PI / 1440)));
-      }
-    }
-  }
-
-  @Test
-  public void concurrentForecastTest() throws SQLException, 
InterruptedException {
-    for (AINodeTestUtils.FakeModelInfo modelInfo : MODEL_LIST) {
-      concurrentGPUForecastTest(modelInfo, "0,1");
-      // TODO: Enable cpu test after optimize memory consumption
-      //      concurrentGPUForecastTest(modelInfo, "cpu");
-    }
-  }
-
-  public void concurrentGPUForecastTest(AINodeTestUtils.FakeModelInfo 
modelInfo, String devices)
-      throws SQLException, InterruptedException {
-    final int forecastLength = 512;
-    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
-        Statement statement = connection.createStatement()) {
-      // Single forecast request can be processed successfully
-      final String forecastSQL =
-          String.format(
-              FORECAST_TABLE_FUNCTION_SQL_TEMPLATE, modelInfo.getModelId(), 
forecastLength);
-      final int threadCnt = 10;
-      final int loop = 100;
-      statement.execute(
-          String.format("LOAD MODEL %s TO DEVICES '%s'", 
modelInfo.getModelId(), devices));
-      checkModelOnSpecifiedDevice(statement, modelInfo.getModelId(), devices);
-      long startTime = System.currentTimeMillis();
-      concurrentInference(statement, forecastSQL, threadCnt, loop, 
forecastLength);
-      long endTime = System.currentTimeMillis();
-      LOGGER.info(
-          String.format(
-              "Model %s concurrent inference %d reqs (%d threads, %d loops) in 
GPU takes time: %dms",
-              modelInfo.getModelId(), threadCnt * loop, threadCnt, loop, 
endTime - startTime));
-      statement.execute(
-          String.format("UNLOAD MODEL %s FROM DEVICES '%s'", 
modelInfo.getModelId(), devices));
-      checkModelNotOnSpecifiedDevice(statement, modelInfo.getModelId(), 
devices);
-    }
-  }
-}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
index 7a71682f167..1686e916ff0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeSharedClusterIT.java
@@ -29,9 +29,13 @@ import org.apache.iotdb.itbase.env.BaseEnv;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -49,6 +53,7 @@ import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.BUILTIN_MODEL_MAP;
 import static org.apache.iotdb.ainode.utils.AINodeTestUtils.checkHeader;
 import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelNotOnSpecifiedDevice;
 import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.checkModelOnSpecifiedDevice;
+import static 
org.apache.iotdb.ainode.utils.AINodeTestUtils.concurrentInference;
 import static org.apache.iotdb.ainode.utils.AINodeTestUtils.errorTest;
 import static org.apache.iotdb.ainode.utils.AINodeTestUtils.prepareDataInTable;
 import static org.apache.iotdb.ainode.utils.AINodeTestUtils.prepareDataInTree;
@@ -59,13 +64,21 @@ import static org.junit.Assert.fail;
 
 /**
  * Consolidates AINodeDeviceManageIT, AINodeModelManageIT, 
AINodeCallInferenceIT, AINodeForecastIT,
- * and AINodeInstanceManagementIT into a single class that shares one 1C1D1A 
cluster, avoiding 5
- * redundant cluster startups (~20 min saved).
+ * AINodeInstanceManagementIT, AINodeConcurrentForecastIT, and 
AINodeClusterConfigIT into a single
+ * class that shares one 1C1D1A cluster, avoiding 7 redundant cluster startups.
+ *
+ * <p>Test methods run in alphabetical order via {@link FixMethodOrder} so 
that {@link
+ * #zzAiNodeRegisterAndRemoveMustRunLast()} (which calls {@code REMOVE AINODE} 
and tears the AINode
+ * out of the cluster) executes after every other test. Do 
<strong>not</strong> add new
+ * {@code @Test} methods whose names sort after {@code zz}.
  */
 @RunWith(IoTDBTestRunner.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 @Category({AIClusterIT.class})
 public class AINodeSharedClusterIT {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AINodeSharedClusterIT.class);
+
   private static final String TARGET_DEVICES_STR = "0,1";
   private static final Set<String> TARGET_DEVICES =
       new HashSet<>(Arrays.asList(TARGET_DEVICES_STR.split(",")));
@@ -87,11 +100,19 @@ public class AINodeSharedClusterIT {
           + "timecol=>'%s'"
           + ")";
 
+  private static final List<FakeModelInfo> CONCURRENT_FORECAST_MODELS =
+      Arrays.asList(
+          new FakeModelInfo("sundial", "sundial", "builtin", "active"),
+          new FakeModelInfo("timer_xl", "timer", "builtin", "active"));
+  private static final String CONCURRENT_FORECAST_SQL_TEMPLATE =
+      "SELECT * FROM FORECAST(model_id=>'%s', targets=>(SELECT time,s FROM 
concurrent_db.AI) ORDER BY time, output_length=>%d)";
+
   @BeforeClass
   public static void setUp() throws Exception {
     EnvFactory.getEnv().initClusterEnvironment(1, 1);
     prepareDataInTree();
     prepareDataInTable();
+    prepareDataForConcurrentForecast();
   }
 
   @AfterClass
@@ -168,6 +189,8 @@ public class AINodeSharedClusterIT {
     }
   }
 
+  // ========== BuiltinModel tests ==========
+
   @Test
   public void dropBuiltInModelErrorTestInTree() throws SQLException {
     try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
@@ -408,6 +431,63 @@ public class AINodeSharedClusterIT {
         statement, invalidTimecolSQL2, "701: The type of the column [s0] is 
not as expected.");
   }
 
+  // ========== Concurrent forecast tests ==========
+
+  @Test
+  public void concurrentForecastTest() throws SQLException, 
InterruptedException {
+    for (FakeModelInfo modelInfo : CONCURRENT_FORECAST_MODELS) {
+      concurrentGPUForecastTest(modelInfo, "0,1");
+      // TODO: Enable cpu test after optimize memory consumption
+      // concurrentGPUForecastTest(modelInfo, "cpu");
+    }
+  }
+
+  private void concurrentGPUForecastTest(FakeModelInfo modelInfo, String 
devices)
+      throws SQLException, InterruptedException {
+    final int forecastLength = 512;
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      final String forecastSQL =
+          String.format(CONCURRENT_FORECAST_SQL_TEMPLATE, 
modelInfo.getModelId(), forecastLength);
+      final int threadCnt = 10;
+      // PR CI keeps a concurrency smoke check; nightly/daily can dial this up 
if regressions
+      // appear.
+      final int loop = 10;
+      statement.execute(
+          String.format("LOAD MODEL %s TO DEVICES '%s'", 
modelInfo.getModelId(), devices));
+      checkModelOnSpecifiedDevice(statement, modelInfo.getModelId(), devices);
+      long startTime = System.currentTimeMillis();
+      concurrentInference(statement, forecastSQL, threadCnt, loop, 
forecastLength);
+      long endTime = System.currentTimeMillis();
+      LOGGER.info(
+          String.format(
+              "Model %s concurrent inference %d reqs (%d threads, %d loops) in 
GPU takes time: %dms",
+              modelInfo.getModelId(), threadCnt * loop, threadCnt, loop, 
endTime - startTime));
+      statement.execute(
+          String.format("UNLOAD MODEL %s FROM DEVICES '%s'", 
modelInfo.getModelId(), devices));
+      checkModelNotOnSpecifiedDevice(statement, modelInfo.getModelId(), 
devices);
+    }
+  }
+
+  private static void prepareDataForConcurrentForecast() throws SQLException {
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE concurrent_db");
+      statement.execute("CREATE TABLE concurrent_db.AI (s DOUBLE FIELD)");
+      for (int i = 0; i < 2880; i++) {
+        statement.addBatch(
+            String.format(
+                "INSERT INTO concurrent_db.AI(time, s) VALUES(%d, %f)",
+                i, Math.sin(i * Math.PI / 1440)));
+        if ((i + 1) % 500 == 0) {
+          statement.executeBatch();
+          statement.clearBatch();
+        }
+      }
+      statement.executeBatch();
+    }
+  }
+
   // ========== InstanceManagement tests ==========
 
   @Test
@@ -490,6 +570,86 @@ public class AINodeSharedClusterIT {
         "1510: Device ID list contains duplicate entries.");
   }
 
+  // ========== AINode lifecycle (must run last) ==========
+
+  /**
+   * REMOVE AINODE permanently tears the AINode out of the cluster, so this 
test must run after
+   * every other {@code @Test} in the class. The {@code zz} prefix combined 
with {@link
+   * FixMethodOrder} on the class keeps it last under alphabetical sorting; do 
not add new test
+   * methods whose names sort after this one.
+   */
+  @Test
+  public void zzAiNodeRegisterAndRemoveMustRunLast() throws SQLException {
+    final String showSql = "SHOW AINODES";
+    final String title = "NodeID,Status,InternalAddress,InternalPort";
+
+    // Verify AINode exists via both dialects before removal
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      verifyAINodeExists(statement, showSql, title);
+    }
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      verifyAINodeExists(statement, showSql, title);
+    }
+
+    // Remove AINode
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("REMOVE AINODE");
+      waitForAINodeRemoval(statement, showSql, title);
+    }
+
+    // Verify removal is visible via table dialect as well
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(showSql)) {
+        checkHeader(resultSet.getMetaData(), title);
+        int count = 0;
+        while (resultSet.next()) {
+          count++;
+        }
+        assertEquals(0, count);
+      }
+    }
+  }
+
+  private static void verifyAINodeExists(Statement statement, String showSql, 
String title)
+      throws SQLException {
+    try (ResultSet resultSet = statement.executeQuery(showSql)) {
+      checkHeader(resultSet.getMetaData(), title);
+      int count = 0;
+      while (resultSet.next()) {
+        assertEquals("2", resultSet.getString(1));
+        assertEquals("Running", resultSet.getString(2));
+        count++;
+      }
+      assertEquals(1, count);
+    }
+  }
+
+  private static void waitForAINodeRemoval(Statement statement, String 
showSql, String title)
+      throws SQLException {
+    for (int retry = 0; retry < 500; retry++) {
+      try (ResultSet resultSet = statement.executeQuery(showSql)) {
+        checkHeader(resultSet.getMetaData(), title);
+        int count = 0;
+        while (resultSet.next()) {
+          count++;
+        }
+        if (count == 0) {
+          return;
+        }
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    Assert.fail("The target AINode is not removed successfully after all 
retries.");
+  }
+
   // ========== Helper methods (from ModelManageIT) ==========
 
   private static void registerUserDefinedModel(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java
 
b/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java
index d69a301c066..8f5567c7c9c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/ainode/utils/AINodeTestUtils.java
@@ -232,11 +232,16 @@ public class AINodeTestUtils {
     try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
         Statement statement = connection.createStatement()) {
       for (int i = 0; i < 5760; i++) {
-        statement.execute(
+        statement.addBatch(
             String.format(
                 "INSERT INTO root.AI(timestamp,s0,s1,s2,s3) 
VALUES(%d,%f,%f,%d,%d)",
                 i, (float) i, (double) i, i, i));
+        if ((i + 1) % 500 == 0) {
+          statement.executeBatch();
+          statement.clearBatch();
+        }
       }
+      statement.executeBatch();
     }
   }
 
@@ -248,11 +253,16 @@ public class AINodeTestUtils {
       statement.execute(
           "CREATE TABLE db.AI (s0 FLOAT FIELD, s1 DOUBLE FIELD, s2 INT32 
FIELD, s3 INT64 FIELD)");
       for (int i = 0; i < 5760; i++) {
-        statement.execute(
+        statement.addBatch(
             String.format(
                 "INSERT INTO db.AI(time,s0,s1,s2,s3) VALUES(%d,%f,%f,%d,%d)",
                 i, (float) i, (double) i, i, i));
+        if ((i + 1) % 500 == 0) {
+          statement.executeBatch();
+          statement.clearBatch();
+        }
       }
+      statement.executeBatch();
     }
   }
 
@@ -264,7 +274,7 @@ public class AINodeTestUtils {
       statement.execute(
           "CREATE TABLE db.AI2 (s0 FLOAT FIELD, s1 DOUBLE FIELD, s2 INT32 
FIELD, s3 INT64 FIELD, s4 FLOAT FIELD, s5 DOUBLE FIELD, s6 INT32 FIELD, s7 
INT64 FIELD, s8 FLOAT FIELD, s9 DOUBLE FIELD)");
       for (int i = 0; i < 2880; i++) {
-        statement.execute(
+        statement.addBatch(
             String.format(
                 "INSERT INTO db.AI2(time,s0,s1,s2,s3,s4,s5,s6,s7,s8,s9) 
VALUES(%d,%f,%f,%d,%d,%f,%f,%d,%d,%f,%f)",
                 i,
@@ -278,7 +288,12 @@ public class AINodeTestUtils {
                 i * 2,
                 (float) (i * 3),
                 (double) (i * 3)));
+        if ((i + 1) % 500 == 0) {
+          statement.executeBatch();
+          statement.clearBatch();
+        }
       }
+      statement.executeBatch();
     }
   }
 

Reply via email to