This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 06d27660b9c Make some uts more stable (#17720)
06d27660b9c is described below
commit 06d27660b9c486aaa5a595d3944c2629a2c714a0
Author: Jackie Tien <[email protected]>
AuthorDate: Tue May 19 13:34:04 2026 +0800
Make some uts more stable (#17720)
---
.../fragment/FragmentInstanceExecutionTest.java | 124 ++++++++++++---------
.../operator/SingleDeviceViewOperatorTest.java | 3 +
.../inner/InnerSequenceCompactionSpeedTest.java | 65 -----------
3 files changed, 74 insertions(+), 118 deletions(-)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 99a94eb02a3..93290fdf4da 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -51,6 +51,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -62,6 +63,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -72,8 +74,16 @@ import static org.junit.Assert.fail;
public class FragmentInstanceExecutionTest {
+ @BeforeClass
+ public static void setUpClass() {
+ // Initialize DataNodeId before any test to avoid
ExceptionInInitializerError when
+ // Coordinator.<clinit> is triggered indirectly by async state-change
listeners
+ // (e.g., via QueryRelatedResourceMetricSet -> Coordinator ->
QueryIdGenerator).
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+ }
+
@Test
- public void testFragmentInstanceExecution() {
+ public void testFragmentInstanceExecution() throws InterruptedException {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
try {
@@ -110,75 +120,84 @@ public class FragmentInstanceExecutionTest {
e.printStackTrace();
fail(e.getMessage());
} finally {
- instanceNotificationExecutor.shutdown();
+ shutdownAndAwaitTermination(instanceNotificationExecutor);
}
}
@Test
public void testTVListOwnerTransfer() throws InterruptedException {
- // Capture System.err to check for warning messages
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+
+ // Capture System.out to check for warning messages. Set up the capture
only after the
+ // executor is created so that any thread-pool init logging does not
pollute the captured
+ // output. Capture must also be torn down before awaiting executor
termination so any
+ // late async log output from this test goes to the original stream, not
the captured one.
PrintStream systemOut = System.out;
ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
System.setOut(new PrintStream(logPrint));
try {
- IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
-
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
- try {
- // TVList
- TVList tvList = buildTVList();
-
- // FragmentInstance Context & Execution
- FragmentInstanceExecution execution1 =
- createFragmentInstanceExecution(1, instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext1 =
execution1.getFragmentInstanceContext();
- fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
- tvList.getQueryContextSet().add(fragmentInstanceContext1);
-
- FragmentInstanceExecution execution2 =
- createFragmentInstanceExecution(2, instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext2 =
execution2.getFragmentInstanceContext();
- fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
- tvList.getQueryContextSet().add(fragmentInstanceContext2);
-
- // mock flush's behavior
- fragmentInstanceContext1
- .getMemoryReservationContext()
- .reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize());
- tvList.setOwnerQuery(fragmentInstanceContext1);
-
- fragmentInstanceContext1.decrementNumOfUnClosedDriver();
- fragmentInstanceContext2.decrementNumOfUnClosedDriver();
-
- fragmentInstanceContext1.getStateMachine().finished();
- Thread.sleep(100);
- fragmentInstanceContext2.getStateMachine().finished();
-
- assertTrue(execution1.getInstanceState().isDone());
- assertTrue(execution2.getInstanceState().isDone());
- Thread.sleep(100);
- } catch (CpuNotEnoughException | MemoryNotEnoughException |
IllegalArgumentException e) {
- fail(e.getMessage());
- } finally {
- instanceNotificationExecutor.shutdown();
- }
+ // TVList
+ TVList tvList = buildTVList();
+
+ // FragmentInstance Context & Execution
+ FragmentInstanceExecution execution1 =
+ createFragmentInstanceExecution(1, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext1 =
execution1.getFragmentInstanceContext();
+ fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+ tvList.getQueryContextSet().add(fragmentInstanceContext1);
+
+ FragmentInstanceExecution execution2 =
+ createFragmentInstanceExecution(2, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext2 =
execution2.getFragmentInstanceContext();
+ fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+ tvList.getQueryContextSet().add(fragmentInstanceContext2);
+
+ // mock flush's behavior
+ fragmentInstanceContext1
+ .getMemoryReservationContext()
+ .reserveMemoryCumulatively(tvList.calculateRamSize().getRamSize());
+ tvList.setOwnerQuery(fragmentInstanceContext1);
+
+ fragmentInstanceContext1.decrementNumOfUnClosedDriver();
+ fragmentInstanceContext2.decrementNumOfUnClosedDriver();
+
+ fragmentInstanceContext1.getStateMachine().finished();
+ Thread.sleep(100);
+ fragmentInstanceContext2.getStateMachine().finished();
+
+ assertTrue(execution1.getInstanceState().isDone());
+ assertTrue(execution2.getInstanceState().isDone());
+ } catch (CpuNotEnoughException | MemoryNotEnoughException |
IllegalArgumentException e) {
+ fail(e.getMessage());
} finally {
- // Restore original System.out
+ // Restore original System.out before waiting for the executor so that
any late
+ // async log output from listeners is written to the real stdout, not
the captured buffer.
System.setOut(systemOut);
+ shutdownAndAwaitTermination(instanceNotificationExecutor);
// should not contain warn message: "The memory cost to be released is
larger than the memory
// cost of memory block"
String capturedOutput = logPrint.toString();
- assertTrue(capturedOutput.isEmpty());
+ assertFalse(
+ "captured stdout should not contain memory-block release warning,
but was:\n"
+ + capturedOutput,
+ capturedOutput.contains(
+ "The memory cost to be released is larger than the memory cost
of memory block"));
}
}
- @Test
- public void testTVListCloneForQuery() {
- IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+ private static void shutdownAndAwaitTermination(ExecutorService executor)
+ throws InterruptedException {
+ executor.shutdown();
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ }
+ @Test
+ public void testTVListCloneForQuery() throws InterruptedException {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
@@ -218,8 +237,7 @@ public class FragmentInstanceExecutionTest {
Collections.emptyMap()));
ReadOnlyMemChunk readOnlyMemChunk1 =
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE,
null, null);
- ReadOnlyMemChunk readOnlyMemChunk2 =
- memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE,
null, null);
+ memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null,
null);
IPointReader pointReader = readOnlyMemChunk1.getPointReader();
while (pointReader.hasNextTimeValuePair()) {
@@ -234,7 +252,7 @@ public class FragmentInstanceExecutionTest {
| IllegalArgumentException e) {
fail(e.getMessage());
} finally {
- instanceNotificationExecutor.shutdown();
+ shutdownAndAwaitTermination(instanceNotificationExecutor);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
index cfe53f4e708..f88cdef3e2b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
@@ -174,6 +174,9 @@ public class SingleDeviceViewOperatorTest {
int total = 0;
while (singleDeviceViewOperator.isBlocked().isDone() &&
singleDeviceViewOperator.hasNext()) {
TsBlock tsBlock = singleDeviceViewOperator.next();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
assertEquals(4, tsBlock.getValueColumnCount());
total += tsBlock.getPositionCount();
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
index f417c042e7c..106559842a7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSequenceCompactionSpeedTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.inner;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -40,9 +39,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -64,67 +60,6 @@ public class InnerSequenceCompactionSpeedTest extends
AbstractCompactionTest {
.setCompactionReadThroughputRate(compactionReadThroughputPerSec);
}
- @Test
- public void testManyAlignedDeviceTsFile() throws IOException,
InterruptedException {
- List<String> deviceNames = new ArrayList<>();
- for (int i = 0; i < 100000; i++) {
- deviceNames.add("d" + i);
- }
- TsFileResource resource = createEmptyFileAndResource(true);
- try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
- for (String device : deviceNames) {
- writer.startChunkGroup(device);
- writer.generateSimpleAlignedSeriesToCurrentDevice(
- Collections.singletonList("s0"),
- new TimeRange[] {new TimeRange(1, 2)},
- TSEncoding.PLAIN,
- CompressionType.LZ4);
- writer.endChunkGroup();
- }
- writer.endFile();
- }
- seqResources.add(resource);
- tsFileManager.add(resource, true);
- long tsFileSize = resource.getTsFileSize();
- Thread thread =
- new Thread(
- () -> {
- InnerSpaceCompactionTask task =
- new InnerSpaceCompactionTask(
- 0, tsFileManager, seqResources, true, new
ReadChunkCompactionPerformer(), 0);
- task.start();
- });
- thread.start();
- thread.join(TimeUnit.SECONDS.toMillis(30 + tsFileSize / IoTDBConstant.MB));
- }
-
- @Test
- public void testManyNotAlignedDeviceTsFile() throws IOException {
- List<String> deviceNames = new ArrayList<>();
- for (int i = 0; i < 100000; i++) {
- deviceNames.add("d" + i);
- }
- TsFileResource resource = createEmptyFileAndResource(true);
- try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
- for (String device : deviceNames) {
- writer.startChunkGroup(device);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s0", new TimeRange[] {new TimeRange(1, 2)}, TSEncoding.PLAIN,
CompressionType.LZ4);
- writer.endChunkGroup();
- }
- writer.endFile();
- }
- seqResources.add(resource);
- tsFileManager.add(resource, true);
- long tsFileSize = resource.getTsFileSize();
- InnerSpaceCompactionTask task =
- new InnerSpaceCompactionTask(
- 0, tsFileManager, seqResources, true, new
ReadChunkCompactionPerformer(), 0);
- Assert.assertTrue(task.start());
- Assert.assertTrue(
- TimeUnit.SECONDS.toMillis(tsFileSize / IoTDBConstant.MB + 30) >
task.getTimeCost());
- }
-
@Test
public void testReadRateLimit() throws IOException, InterruptedException {
int compactionReadOperationPerSec =