This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d11770d7443 feat: add integration test support for StreamNode (#17952)
d11770d7443 is described below

commit d11770d74435b14d2d2f3ec820eb438a42705fce
Author: suchenglong <[email protected]>
AuthorDate: Thu Jun 18 10:55:23 2026 +0800

    feat: add integration test support for StreamNode (#17952)
---
 .../org/apache/iotdb/it/env/cluster/env/AIEnv.java | 28 +++++-
 .../iotdb/it/env/cluster/env/AbstractEnv.java      | 99 +++++++++-------------
 .../iotdb/it/env/cluster/node/AINodeStarter.java   | 84 ++++++++++++++++++
 .../itbase/runtime/ParallelRequestDelegate.java    | 13 ++-
 4 files changed, 160 insertions(+), 64 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
index f812e9db3d1..a15a6243b3c 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AIEnv.java
@@ -19,7 +19,14 @@
 
 package org.apache.iotdb.it.env.cluster.env;
 
+import org.apache.iotdb.it.env.cluster.node.AINodeStarter;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+
+import java.util.List;
+
 public class AIEnv extends AbstractEnv {
+
   @Override
   public void initClusterEnvironment() {
     initClusterEnvironment(1, 1);
@@ -27,12 +34,29 @@ public class AIEnv extends AbstractEnv {
 
   @Override
   public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
-    super.initEnvironment(configNodesNum, dataNodesNum, 600, true);
+    super.initEnvironment(configNodesNum, dataNodesNum, 600);
   }
 
   @Override
   public void initClusterEnvironment(
       int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
-    super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, 
true);
+    super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
+  }
+
+  @Override
+  protected void initExtraNodes(
+      final List<ConfigNodeWrapper> configNodeWrappers,
+      final List<DataNodeWrapper> dataNodeWrappers,
+      final String testClassName) {
+    AINodeStarter.startAINode(
+        configNodeWrappers.get(0).getIpAndPortString(),
+        dataNodeWrappers.get(0).getPort(),
+        testClassName,
+        testMethodName,
+        index,
+        startTime,
+        extraNodeKillPoints,
+        this::registerExtraNode,
+        this::dumpTestJVMSnapshot);
   }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 0e7eefb3a41..77ccc25d936 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
 import org.apache.iotdb.it.env.cluster.config.MppConfigNodeConfig;
 import org.apache.iotdb.it.env.cluster.config.MppDataNodeConfig;
 import org.apache.iotdb.it.env.cluster.config.MppJVMConfig;
-import org.apache.iotdb.it.env.cluster.node.AINodeWrapper;
 import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
 import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -101,7 +100,7 @@ public abstract class AbstractEnv implements BaseEnv {
   private final Random rand = new Random();
   protected List<ConfigNodeWrapper> configNodeWrapperList = 
Collections.emptyList();
   protected List<DataNodeWrapper> dataNodeWrapperList = 
Collections.emptyList();
-  protected List<AINodeWrapper> aiNodeWrapperList = Collections.emptyList();
+  protected List<AbstractNodeWrapper> extraNodeWrappers = 
Collections.emptyList();
   protected String testMethodName = null;
   protected int index = 0;
   protected long startTime;
@@ -109,6 +108,7 @@ public abstract class AbstractEnv implements BaseEnv {
   private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> 
clientManager;
   private List<String> configNodeKillPoints = new ArrayList<>();
   private List<String> dataNodeKillPoints = new ArrayList<>();
+  protected List<String> extraNodeKillPoints = new ArrayList<>();
 
   /**
    * This config object stores the properties set by developers during the 
test. It will be cleared
@@ -169,17 +169,10 @@ public abstract class AbstractEnv implements BaseEnv {
 
   protected void initEnvironment(
       final int configNodesNum, final int dataNodesNum, final int 
testWorkingRetryCount) {
-    initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, 
false);
-  }
-
-  protected void initEnvironment(
-      final int configNodesNum,
-      final int dataNodesNum,
-      final int retryCount,
-      final boolean addAINode) {
-    this.retryCount = retryCount;
+    this.retryCount = testWorkingRetryCount;
     this.configNodeWrapperList = new ArrayList<>();
     this.dataNodeWrapperList = new ArrayList<>();
+    this.extraNodeWrappers = new ArrayList<>();
 
     clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
@@ -258,14 +251,34 @@ public abstract class AbstractEnv implements BaseEnv {
       throw new AssertionError();
     }
 
-    if (addAINode) {
-      this.aiNodeWrapperList = new ArrayList<>();
-      startAINode(seedConfigNode, this.dataNodeWrapperList.get(0).getPort(), 
testClassName);
-    }
+    initExtraNodes(configNodeWrapperList, dataNodeWrapperList, testClassName);
 
     checkClusterStatusWithoutUnknown();
   }
 
+  /**
+   * Hook method for subclasses to initialize and start extra node types 
beyond the core ConfigNode
+   * and DataNode (e.g., AINode, StreamNode, ProxyNode).
+   *
+   * <p>Subclasses should create node wrappers, add them to {@link 
#extraNodeWrappers}, configure
+   * kill points via {@link #extraNodeKillPoints}, and start the nodes. 
Subclasses have direct
+   * access to protected fields: {@code testMethodName}, {@code index}, {@code 
startTime}.
+   *
+   * @param configNodeWrappers list of all ConfigNode wrappers in the cluster 
(unmodifiable)
+   * @param dataNodeWrappers list of all DataNode wrappers in the cluster 
(unmodifiable)
+   * @param testClassName the test class name for logging and identification 
purposes
+   */
+  protected void initExtraNodes(
+      final List<ConfigNodeWrapper> configNodeWrappers,
+      final List<DataNodeWrapper> dataNodeWrappers,
+      final String testClassName) {
+    // Default: no extra nodes. Subclasses override to add nodes.
+  }
+
+  protected void registerExtraNode(final AbstractNodeWrapper nodeWrapper) {
+    extraNodeWrappers.add(nodeWrapper);
+  }
+
   private ConfigNodeWrapper newConfigNode() {
     final ConfigNodeWrapper configNodeWrapper =
         new ConfigNodeWrapper(
@@ -309,39 +322,6 @@ public abstract class AbstractEnv implements BaseEnv {
     return dataNodeWrapper;
   }
 
-  private void startAINode(
-      final String seedConfigNode, final int clusterIngressPort, final String 
testClassName) {
-    final String aiNodeEndPoint;
-    final AINodeWrapper aiNodeWrapper =
-        new AINodeWrapper(
-            seedConfigNode,
-            clusterIngressPort,
-            testClassName,
-            testMethodName,
-            index,
-            EnvUtils.searchAvailablePorts(),
-            startTime);
-    aiNodeWrapperList.add(aiNodeWrapper);
-    aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
-    aiNodeWrapper.createNodeDir();
-    aiNodeWrapper.createLogDir();
-    final RequestDelegate<Void> aiNodesDelegate =
-        new ParallelRequestDelegate<>(
-            Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT, 
this);
-
-    aiNodesDelegate.addRequest(
-        () -> {
-          aiNodeWrapper.start();
-          return null;
-        });
-
-    try {
-      aiNodesDelegate.requestAll();
-    } catch (final SQLException e) {
-      logger.error("Start aiNodes failed", e);
-    }
-  }
-
   public String getTestClassName() {
     final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
     for (final StackTraceElement stackTraceElement : stack) {
@@ -433,7 +413,7 @@ public abstract class AbstractEnv implements BaseEnv {
         if (showClusterResp.getNodeStatus().size()
             != configNodeWrapperList.size()
                 + dataNodeWrapperList.size()
-                + aiNodeWrapperList.size()) {
+                + extraNodeWrappers.size()) {
           passed = false;
           nodeSizePassed = false;
           actualNodeSize = showClusterResp.getNodeStatusSize();
@@ -465,7 +445,7 @@ public abstract class AbstractEnv implements BaseEnv {
             processStatusMap.put(nodeWrapper, 0);
           }
         }
-        for (AINodeWrapper nodeWrapper : aiNodeWrapperList) {
+        for (AbstractNodeWrapper nodeWrapper : extraNodeWrappers) {
           boolean alive = nodeWrapper.getInstance().isAlive();
           if (!alive) {
             processStatusMap.put(nodeWrapper, 
nodeWrapper.getInstance().waitFor());
@@ -568,14 +548,14 @@ public abstract class AbstractEnv implements BaseEnv {
               configNodeWrapper.start();
             }
           }
-          for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) {
-            if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) {
+          for (AbstractNodeWrapper extraNodeWrapper : extraNodeWrappers) {
+            if (portOccupationMap.containsValue(extraNodeWrapper.getPid())) {
               logger.info(
-                  "A port is occupied by another AINode {}-{}, restart it",
-                  aiNodeWrapper.getIpAndPortString(),
-                  aiNodeWrapper.getPid());
-              aiNodeWrapper.stop();
-              aiNodeWrapper.start();
+                  "A port is occupied by another node {}-{}, restart it",
+                  extraNodeWrapper.getIpAndPortString(),
+                  extraNodeWrapper.getPid());
+              extraNodeWrapper.stop();
+              extraNodeWrapper.start();
             }
           }
         } catch (IOException e) {
@@ -592,8 +572,8 @@ public abstract class AbstractEnv implements BaseEnv {
   public void cleanClusterEnvironment() {
     final List<AbstractNodeWrapper> allNodeWrappers =
         Stream.concat(
-                dataNodeWrapperList.stream(),
-                Stream.concat(configNodeWrapperList.stream(), 
aiNodeWrapperList.stream()))
+                Stream.concat(configNodeWrapperList.stream(), 
dataNodeWrapperList.stream()),
+                extraNodeWrappers.stream())
             .collect(Collectors.toList());
     allNodeWrappers.stream()
         .findAny()
@@ -1045,6 +1025,7 @@ public abstract class AbstractEnv implements BaseEnv {
   public List<AbstractNodeWrapper> getNodeWrapperList() {
     final List<AbstractNodeWrapper> result = new 
ArrayList<>(configNodeWrapperList);
     result.addAll(dataNodeWrapperList);
+    result.addAll(extraNodeWrappers);
     return result;
   }
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
new file mode 100644
index 00000000000..cde3aaa9235
--- /dev/null
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AINodeStarter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.it.env.cluster.node;
+
+import org.apache.iotdb.it.env.cluster.EnvUtils;
+import org.apache.iotdb.it.framework.IoTDBTestLogger;
+import org.apache.iotdb.itbase.runtime.ParallelRequestDelegate;
+import org.apache.iotdb.itbase.runtime.RequestDelegate;
+
+import org.slf4j.Logger;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.NODE_START_TIMEOUT;
+
+public class AINodeStarter {
+  private static final Logger logger = IoTDBTestLogger.logger;
+
+  private AINodeStarter() {}
+
+  public static AINodeWrapper startAINode(
+      final String seedConfigNode,
+      final int clusterIngressPort,
+      final String testClassName,
+      final String testMethodName,
+      final int clusterIndex,
+      final long startTime,
+      final List<String> killPoints,
+      final Consumer<AINodeWrapper> nodeRegister,
+      final Runnable dumpTestJVMSnapshot) {
+    final AINodeWrapper aiNodeWrapper =
+        new AINodeWrapper(
+            seedConfigNode,
+            clusterIngressPort,
+            testClassName,
+            testMethodName,
+            clusterIndex,
+            EnvUtils.searchAvailablePorts(),
+            startTime);
+    nodeRegister.accept(aiNodeWrapper);
+    aiNodeWrapper.setKillPoints(killPoints);
+    aiNodeWrapper.createNodeDir();
+    aiNodeWrapper.createLogDir();
+
+    final RequestDelegate<Void> aiNodesDelegate =
+        new ParallelRequestDelegate<>(
+            Collections.singletonList(aiNodeWrapper.getIpAndPortString()),
+            NODE_START_TIMEOUT,
+            dumpTestJVMSnapshot);
+    aiNodesDelegate.addRequest(
+        () -> {
+          aiNodeWrapper.start();
+          return null;
+        });
+
+    try {
+      aiNodesDelegate.requestAll();
+    } catch (final SQLException e) {
+      logger.error("Start AINode {} failed", aiNodeWrapper.getId(), e);
+      throw new AssertionError();
+    }
+    return aiNodeWrapper;
+  }
+}
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
index 437a6da6e5b..0615fe950fd 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ParallelRequestDelegate.java
@@ -37,13 +37,20 @@ import java.util.concurrent.TimeoutException;
  */
 public class ParallelRequestDelegate<T> extends RequestDelegate<T> {
   private final int taskTimeoutSeconds;
-  private final AbstractEnv env;
+  private final Runnable dumpTestJVMSnapshot;
 
   public ParallelRequestDelegate(
       final List<String> endpoints, final int taskTimeoutSeconds, final 
AbstractEnv env) {
+    this(endpoints, taskTimeoutSeconds, env != null ? env::dumpTestJVMSnapshot 
: () -> {});
+  }
+
+  public ParallelRequestDelegate(
+      final List<String> endpoints,
+      final int taskTimeoutSeconds,
+      final Runnable dumpTestJVMSnapshot) {
     super(endpoints);
     this.taskTimeoutSeconds = taskTimeoutSeconds;
-    this.env = env;
+    this.dumpTestJVMSnapshot = dumpTestJVMSnapshot;
   }
 
   public List<T> requestAll() throws SQLException {
@@ -60,7 +67,7 @@ public class ParallelRequestDelegate<T> extends 
RequestDelegate<T> {
       } catch (ExecutionException e) {
         exceptions[i] = e;
       } catch (InterruptedException | TimeoutException e) {
-        env.dumpTestJVMSnapshot();
+        dumpTestJVMSnapshot.run();
         for (int j = i; j < getEndpoints().size(); j++) {
           resultFutures.get(j).cancel(true);
         }

Reply via email to