This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 06d27660b9c Make some uts more stable (#17720)
06d27660b9c is described below

commit 06d27660b9c486aaa5a595d3944c2629a2c714a0
Author: Jackie Tien <[email protected]>
AuthorDate: Tue May 19 13:34:04 2026 +0800

    Make some uts more stable (#17720)
---
 .../fragment/FragmentInstanceExecutionTest.java    | 124 ++++++++++++---------
 .../operator/SingleDeviceViewOperatorTest.java     |   3 +
 .../inner/InnerSequenceCompactionSpeedTest.java    |  65 -----------
 3 files changed, 74 insertions(+), 118 deletions(-)

diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 99a94eb02a3..93290fdf4da 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -51,6 +51,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.reader.IPointReader;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -62,6 +63,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -72,8 +74,16 @@ import static org.junit.Assert.fail;
 
 public class FragmentInstanceExecutionTest {
 
+  @BeforeClass
+  public static void setUpClass() {
+    // Initialize DataNodeId before any test to avoid 
ExceptionInInitializerError when
+    // Coordinator.<clinit> is triggered indirectly by async state-change 
listeners
+    // (e.g., via QueryRelatedResourceMetricSet -> Coordinator -> 
QueryIdGenerator).
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+  }
+
   @Test
-  public void testFragmentInstanceExecution() {
+  public void testFragmentInstanceExecution() throws InterruptedException {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
     try {
@@ -110,75 +120,84 @@ public class FragmentInstanceExecutionTest {
       e.printStackTrace();
       fail(e.getMessage());
     } finally {
-      instanceNotificationExecutor.shutdown();
+      shutdownAndAwaitTermination(instanceNotificationExecutor);
     }
   }
 
   @Test
   public void testTVListOwnerTransfer() throws InterruptedException {
-    // Capture System.err to check for warning messages
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+
+    // Capture System.out to check for warning messages. Set up the capture 
only after the
+    // executor is created so that any thread-pool init logging does not 
pollute the captured
+    // output. Capture must also be torn down before awaiting executor 
termination so any
+    // late async log output from this test goes to the original stream, not 
the captured one.
     PrintStream systemOut = System.out;
     ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
     System.setOut(new PrintStream(logPrint));
 
     try {
-      IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
-
-      ExecutorService instanceNotificationExecutor =
-          IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
-      try {
-        // TVList
-        TVList tvList = buildTVList();
-
-        // FragmentInstance Context & Execution
-        FragmentInstanceExecution execution1 =
-            createFragmentInstanceExecution(1, instanceNotificationExecutor);
-        FragmentInstanceContext fragmentInstanceContext1 = 
execution1.getFragmentInstanceContext();
-        fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
-        tvList.getQueryContextSet().add(fragmentInstanceContext1);
-
-        FragmentInstanceExecution execution2 =
-            createFragmentInstanceExecution(2, instanceNotificationExecutor);
-        FragmentInstanceContext fragmentInstanceContext2 = 
execution2.getFragmentInstanceContext();
-        fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
-        tvList.getQueryContextSet().add(fragmentInstanceContext2);
-
-        // mock flush's behavior
-        fragmentInstanceContext1
-            .getMemoryReservationContext()
-            .reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize());
-        tvList.setOwnerQuery(fragmentInstanceContext1);
-
-        fragmentInstanceContext1.decrementNumOfUnClosedDriver();
-        fragmentInstanceContext2.decrementNumOfUnClosedDriver();
-
-        fragmentInstanceContext1.getStateMachine().finished();
-        Thread.sleep(100);
-        fragmentInstanceContext2.getStateMachine().finished();
-
-        assertTrue(execution1.getInstanceState().isDone());
-        assertTrue(execution2.getInstanceState().isDone());
-        Thread.sleep(100);
-      } catch (CpuNotEnoughException | MemoryNotEnoughException | 
IllegalArgumentException e) {
-        fail(e.getMessage());
-      } finally {
-        instanceNotificationExecutor.shutdown();
-      }
+      // TVList
+      TVList tvList = buildTVList();
+
+      // FragmentInstance Context & Execution
+      FragmentInstanceExecution execution1 =
+          createFragmentInstanceExecution(1, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext1 = 
execution1.getFragmentInstanceContext();
+      fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+      tvList.getQueryContextSet().add(fragmentInstanceContext1);
+
+      FragmentInstanceExecution execution2 =
+          createFragmentInstanceExecution(2, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext2 = 
execution2.getFragmentInstanceContext();
+      fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+      tvList.getQueryContextSet().add(fragmentInstanceContext2);
+
+      // mock flush's behavior
+      fragmentInstanceContext1
+          .getMemoryReservationContext()
+          .reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize());
+      tvList.setOwnerQuery(fragmentInstanceContext1);
+
+      fragmentInstanceContext1.decrementNumOfUnClosedDriver();
+      fragmentInstanceContext2.decrementNumOfUnClosedDriver();
+
+      fragmentInstanceContext1.getStateMachine().finished();
+      Thread.sleep(100);
+      fragmentInstanceContext2.getStateMachine().finished();
+
+      assertTrue(execution1.getInstanceState().isDone());
+      assertTrue(execution2.getInstanceState().isDone());
+    } catch (CpuNotEnoughException | MemoryNotEnoughException | 
IllegalArgumentException e) {
+      fail(e.getMessage());
     } finally {
-      // Restore original System.out
+      // Restore original System.out before waiting for the executor so that 
any late
+      // async log output from listeners is written to the real stdout, not 
the captured buffer.
       System.setOut(systemOut);
+      shutdownAndAwaitTermination(instanceNotificationExecutor);
 
       // should not contain warn message: "The memory cost to be released is 
larger than the memory
       // cost of memory block"
       String capturedOutput = logPrint.toString();
-      assertTrue(capturedOutput.isEmpty());
+      assertFalse(
+          "captured stdout should not contain memory-block release warning, 
but was:\n"
+              + capturedOutput,
+          capturedOutput.contains(
+              "The memory cost to be released is larger than the memory cost 
of memory block"));
     }
   }
 
-  @Test
-  public void testTVListCloneForQuery() {
-    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+  private static void shutdownAndAwaitTermination(ExecutorService executor)
+      throws InterruptedException {
+    executor.shutdown();
+    if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+      executor.shutdownNow();
+    }
+  }
 
+  @Test
+  public void testTVListCloneForQuery() throws InterruptedException {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
 
@@ -218,8 +237,7 @@ public class FragmentInstanceExecutionTest {
                   Collections.emptyMap()));
       ReadOnlyMemChunk readOnlyMemChunk1 =
           memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, 
null, null);
-      ReadOnlyMemChunk readOnlyMemChunk2 =
-          memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, 
null, null);
+      memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, 
null);
 
       IPointReader pointReader = readOnlyMemChunk1.getPointReader();
       while (pointReader.hasNextTimeValuePair()) {
@@ -234,7 +252,7 @@ public class FragmentInstanceExecutionTest {
         | IllegalArgumentException e) {
       fail(e.getMessage());
     } finally {
-      instanceNotificationExecutor.shutdown();
+      shutdownAndAwaitTermination(instanceNotificationExecutor);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
index cfe53f4e708..f88cdef3e2b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
@@ -174,6 +174,9 @@ public class SingleDeviceViewOperatorTest {
       int total = 0;
       while (singleDeviceViewOperator.isBlocked().isDone() && 
singleDeviceViewOperator.hasNext()) {
         TsBlock tsBlock = singleDeviceViewOperator.next();
+        if (tsBlock == null || tsBlock.isEmpty()) {
+          continue;
+        }
         assertEquals(4, tsBlock.getValueColumnCount());
         total += tsBlock.getPositionCount();
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
index f417c042e7c..106559842a7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -40,9 +39,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -64,67 +60,6 @@ public class InnerSequenceCompactionSpeedTest extends 
AbstractCompactionTest {
         .setCompactionReadThroughputRate(compactionReadThroughputPerSec);
   }
 
-  @Test
-  public void testManyAlignedDeviceTsFile() throws IOException, 
InterruptedException {
-    List<String> deviceNames = new ArrayList<>();
-    for (int i = 0; i < 100000; i++) {
-      deviceNames.add("d" + i);
-    }
-    TsFileResource resource = createEmptyFileAndResource(true);
-    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
-      for (String device : deviceNames) {
-        writer.startChunkGroup(device);
-        writer.generateSimpleAlignedSeriesToCurrentDevice(
-            Collections.singletonList("s0"),
-            new TimeRange[] {new TimeRange(1, 2)},
-            TSEncoding.PLAIN,
-            CompressionType.LZ4);
-        writer.endChunkGroup();
-      }
-      writer.endFile();
-    }
-    seqResources.add(resource);
-    tsFileManager.add(resource, true);
-    long tsFileSize = resource.getTsFileSize();
-    Thread thread =
-        new Thread(
-            () -> {
-              InnerSpaceCompactionTask task =
-                  new InnerSpaceCompactionTask(
-                      0, tsFileManager, seqResources, true, new 
ReadChunkCompactionPerformer(), 0);
-              task.start();
-            });
-    thread.start();
-    thread.join(TimeUnit.SECONDS.toMillis(30 + tsFileSize / IoTDBConstant.MB));
-  }
-
-  @Test
-  public void testManyNotAlignedDeviceTsFile() throws IOException {
-    List<String> deviceNames = new ArrayList<>();
-    for (int i = 0; i < 100000; i++) {
-      deviceNames.add("d" + i);
-    }
-    TsFileResource resource = createEmptyFileAndResource(true);
-    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
-      for (String device : deviceNames) {
-        writer.startChunkGroup(device);
-        writer.generateSimpleNonAlignedSeriesToCurrentDevice(
-            "s0", new TimeRange[] {new TimeRange(1, 2)}, TSEncoding.PLAIN, 
CompressionType.LZ4);
-        writer.endChunkGroup();
-      }
-      writer.endFile();
-    }
-    seqResources.add(resource);
-    tsFileManager.add(resource, true);
-    long tsFileSize = resource.getTsFileSize();
-    InnerSpaceCompactionTask task =
-        new InnerSpaceCompactionTask(
-            0, tsFileManager, seqResources, true, new 
ReadChunkCompactionPerformer(), 0);
-    Assert.assertTrue(task.start());
-    Assert.assertTrue(
-        TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) > 
task.getTimeCost());
-  }
-
   @Test
   public void testReadRateLimit() throws IOException, InterruptedException {
     int compactionReadOperationPerSec =

Reply via email to