This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch 
check_consensus_before_answering_region_request
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f8f5a7f5d286d09a97bde91fa14a71067c2a699c
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Apr 23 18:02:49 2026 +0800

    Add Await and DataNodeContext
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |   6 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  15 +-
 .../db/service/DataNodeInternalRPCService.java     |  12 +-
 .../DataNodeInternalRPCServiceImplTest.java        |   8 +-
 .../org/apache/iotdb/commons/concurrent/Await.java |  53 ++++++
 .../commons/concurrent/AwaitTimeoutException.java  |  31 ++++
 .../iotdb/commons/concurrent/ConditionAwaiter.java | 154 +++++++++++++++++
 .../java/org/apache/iotdb/commons/AwaitTest.java   | 190 +++++++++++++++++++++
 8 files changed, 461 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 9fc5fcb436c..28422bbd837 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -193,6 +193,7 @@ import 
org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
 import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
 import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
 import org.apache.iotdb.db.service.DataNode;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
 import org.apache.iotdb.db.service.RegionMigrateService;
 import 
org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -416,6 +417,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
+  private final DataNodeContext dataNodeContext;
+
   private final ExecutorService schemaExecutor =
       new WrappedThreadPoolExecutor(
           0,
@@ -430,10 +433,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private static final String SYSTEM = "system";
 
-  public DataNodeInternalRPCServiceImpl() {
+  public DataNodeInternalRPCServiceImpl(DataNodeContext dataNodeContext) {
     super();
     partitionFetcher = ClusterPartitionFetcher.getInstance();
     schemaFetcher = ClusterSchemaFetcher.getInstance();
+    this.dataNodeContext = dataNodeContext;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7004d09872d..568f2c266e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -183,14 +183,16 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
   private static final String REGISTER_INTERRUPTION =
       "Unexpected interruption when waiting to register to the cluster";
 
-  private boolean schemaRegionConsensusStarted = false;
-  private boolean dataRegionConsensusStarted = false;
+  private volatile boolean schemaRegionConsensusStarted = false;
+  private volatile boolean dataRegionConsensusStarted = false;
   private static Thread watcherThread;
+  private DataNodeContext context;
 
   public DataNode() {
     super("DataNode");
     // We do not init anything here, so that we can re-initialize the instance 
in IT.
     DataNodeHolder.INSTANCE = this;
+    context = new DataNodeContext();
   }
 
   public static void reinitializeStatics() {
@@ -934,7 +936,8 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
 
   protected void registerInternalRPCService() throws StartupException {
     // Start InternalRPCService to indicate that the current DataNode can 
accept cluster scheduling
-    registerManager.register(DataNodeInternalRPCService.getInstance());
+    DataNodeInternalRPCService instance = 
DataNodeInternalRPCService.getInstance();
+    registerManager.register();
   }
 
   // make it easier for users to extend ClientRPCServiceImpl to export more 
rpc services
@@ -1373,4 +1376,10 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
       // Empty constructor
     }
   }
+
+  public class DataNodeContext {
+    public boolean isAllConsensusStarted() {
+      return dataRegionConsensusStarted && schemaRegionConsensusStarted;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
index 5de1041a9a0..f3bf8e507c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
 import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor;
 import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
@@ -44,6 +45,7 @@ public class DataNodeInternalRPCService extends ThriftService
   private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final AtomicReference<DataNodeInternalRPCServiceImpl> impl = new 
AtomicReference<>();
+  private DataNodeContext dataNodeContext;
 
   private DataNodeInternalRPCService() {}
 
@@ -54,9 +56,9 @@ public class DataNodeInternalRPCService extends ThriftService
 
   @Override
   public void initTProcessor() {
-    impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
+    DataNodeInternalRPCServiceImpl service = getImpl();
     initSyncedServiceImpl(null);
-    processor = new Processor<>(impl.get());
+    processor = new Processor<>(service);
   }
 
   @Override
@@ -109,7 +111,7 @@ public class DataNodeInternalRPCService extends 
ThriftService
   }
 
   public DataNodeInternalRPCServiceImpl getImpl() {
-    impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
+    impl.compareAndSet(null, new 
DataNodeInternalRPCServiceImpl(dataNodeContext));
     return impl.get();
   }
 
@@ -122,4 +124,8 @@ public class DataNodeInternalRPCService extends 
ThriftService
   public static DataNodeInternalRPCService getInstance() {
     return DataNodeInternalRPCServiceHolder.INSTANCE;
   }
+
+  public void setDataNodeContext(DataNodeContext dataNodeContext) {
+    this.dataNodeContext = dataNodeContext;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index 22a2d6cdc46..f601c9bd1fc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.service;
 
+import static org.mockito.Mockito.when;
+
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -50,6 +52,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -79,6 +82,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import org.mockito.Mockito;
 
 public class DataNodeInternalRPCServiceImplTest {
 
@@ -134,7 +138,9 @@ public class DataNodeInternalRPCServiceImplTest {
         .createLocalPeer(
             
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
             genSchemaRegionPeerList(regionReplicaSet));
-    dataNodeInternalRPCServiceImpl = new DataNodeInternalRPCServiceImpl();
+    DataNodeContext context = Mockito.mock(DataNodeContext.class);
+    when(context.isAllConsensusStarted()).thenReturn(true);
+    dataNodeInternalRPCServiceImpl = new 
DataNodeInternalRPCServiceImpl(context);
   }
 
   @After
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java
new file mode 100644
index 00000000000..f08ccd17495
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+/**
+ * Lightweight polling utility for production code. Provides a fluent API 
similar to Awaitility for
+ * waiting until a condition becomes true.
+ *
+ * <pre>{@code
+ * // Wait with timeout
+ * Await.await()
+ *     .atMost(5, TimeUnit.SECONDS)
+ *     .pollInterval(100, TimeUnit.MILLISECONDS)
+ *     .until(() -> isReady());
+ *
+ * // Wait forever (use with caution)
+ * Await.await()
+ *     .forever()
+ *     .pollInterval(1, TimeUnit.SECONDS)
+ *     .until(() -> isReady());
+ *
+ * // Ignore exceptions during polling
+ * Await.await()
+ *     .atMost(30, TimeUnit.SECONDS)
+ *     .ignoreExceptions()
+ *     .until(() -> tryConnect());
+ * }</pre>
+ */
+public final class Await {
+
+  private Await() {}
+
+  public static ConditionAwaiter await() {
+    return new ConditionAwaiter();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java
new file mode 100644
index 00000000000..b0d5c98bfe7
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+public class AwaitTimeoutException extends RuntimeException {
+
+  public AwaitTimeoutException(String message) {
+    super(message);
+  }
+
+  public AwaitTimeoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java
new file mode 100644
index 00000000000..f88db57f612
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+public class ConditionAwaiter {
+
+  private static final long DEFAULT_POLL_INTERVAL_MS = 100;
+  private static final long DEFAULT_TIMEOUT_MS = 10_000;
+
+  private long timeoutMs = DEFAULT_TIMEOUT_MS;
+  private long pollIntervalMs = DEFAULT_POLL_INTERVAL_MS;
+  private long pollDelayMs = 0;
+  private boolean ignoreAllExceptions = false;
+  private boolean forever = false;
+  private final List<Class<? extends Exception>> ignoredExceptions = new 
ArrayList<>();
+
+  ConditionAwaiter() {}
+
+  public ConditionAwaiter atMost(long time, TimeUnit unit) {
+    this.timeoutMs = unit.toMillis(time);
+    return this;
+  }
+
+  public ConditionAwaiter pollInterval(long time, TimeUnit unit) {
+    this.pollIntervalMs = unit.toMillis(time);
+    return this;
+  }
+
+  public ConditionAwaiter pollDelay(long time, TimeUnit unit) {
+    this.pollDelayMs = unit.toMillis(time);
+    return this;
+  }
+
+  public ConditionAwaiter ignoreExceptions() {
+    this.ignoreAllExceptions = true;
+    return this;
+  }
+
+  public ConditionAwaiter ignoreException(Class<? extends Exception> 
exceptionType) {
+    this.ignoredExceptions.add(exceptionType);
+    return this;
+  }
+
+  public ConditionAwaiter forever() {
+    this.forever = true;
+    return this;
+  }
+
+  public void until(Callable<Boolean> conditionEvaluator) {
+    long startTime = System.currentTimeMillis();
+
+    if (pollDelayMs > 0) {
+      sleep(pollDelayMs);
+    }
+
+    Exception lastException = null;
+    while (true) {
+      try {
+        Boolean result = conditionEvaluator.call();
+        if (Boolean.TRUE.equals(result)) {
+          return;
+        }
+        lastException = null;
+      } catch (Exception e) {
+        if (shouldIgnore(e)) {
+          lastException = e;
+        } else if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          throw new AwaitTimeoutException("Interrupted while awaiting 
condition", e);
+        } else {
+          throw new AwaitTimeoutException("Exception while evaluating 
condition", e);
+        }
+      }
+
+      if (!forever && System.currentTimeMillis() - startTime >= timeoutMs) {
+        String message = String.format("Condition was not met within %d ms", 
timeoutMs);
+        if (lastException != null) {
+          throw new AwaitTimeoutException(message, lastException);
+        }
+        throw new AwaitTimeoutException(message);
+      }
+
+      sleep(pollIntervalMs);
+    }
+  }
+
+  public void untilAsserted(Runnable assertion) {
+    final AssertionErrorHolder holder = new AssertionErrorHolder();
+    try {
+      until(
+          () -> {
+            try {
+              assertion.run();
+              return true;
+            } catch (AssertionError e) {
+              holder.error = e;
+              return false;
+            }
+          });
+    } catch (AwaitTimeoutException e) {
+      if (holder.error != null) {
+        throw new AwaitTimeoutException(e.getMessage(), holder.error);
+      }
+      throw e;
+    }
+  }
+
+  private static final class AssertionErrorHolder {
+    AssertionError error;
+  }
+
+  private boolean shouldIgnore(Exception e) {
+    if (ignoreAllExceptions) {
+      return true;
+    }
+    for (Class<? extends Exception> ignoredType : ignoredExceptions) {
+      if (ignoredType.isInstance(e)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static void sleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new AwaitTimeoutException("Interrupted while awaiting condition", 
e);
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java
new file mode 100644
index 00000000000..ae094c1a806
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons;
+
+import org.apache.iotdb.commons.concurrent.Await;
+import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AwaitTest {
+
+  @Test
+  public void testConditionAlreadyTrue() {
+    Await.await().atMost(1, TimeUnit.SECONDS).until(() -> true);
+  }
+
+  @Test
+  public void testConditionBecomesTrue() {
+    AtomicBoolean flag = new AtomicBoolean(false);
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+              }
+              flag.set(true);
+            })
+        .start();
+
+    Await.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInterval(50, TimeUnit.MILLISECONDS)
+        .until(flag::get);
+
+    assertTrue(flag.get());
+  }
+
+  @Test(expected = AwaitTimeoutException.class)
+  public void testTimeout() {
+    Await.await()
+        .atMost(300, TimeUnit.MILLISECONDS)
+        .pollInterval(50, TimeUnit.MILLISECONDS)
+        .until(() -> false);
+  }
+
+  @Test
+  public void testPollDelay() {
+    long start = System.currentTimeMillis();
+
+    Await.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollDelay(200, TimeUnit.MILLISECONDS)
+        .until(() -> true);
+
+    long elapsed = System.currentTimeMillis() - start;
+    assertTrue("Expected at least 200ms delay, got " + elapsed, elapsed >= 
180);
+  }
+
+  @Test
+  public void testIgnoreAllExceptions() {
+    AtomicInteger counter = new AtomicInteger(0);
+
+    Await.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInterval(50, TimeUnit.MILLISECONDS)
+        .ignoreExceptions()
+        .until(
+            () -> {
+              int val = counter.incrementAndGet();
+              if (val < 3) {
+                throw new RuntimeException("not ready yet");
+              }
+              return true;
+            });
+
+    assertTrue(counter.get() >= 3);
+  }
+
+  @Test
+  public void testIgnoreSpecificException() {
+    AtomicInteger counter = new AtomicInteger(0);
+
+    Await.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInterval(50, TimeUnit.MILLISECONDS)
+        .ignoreException(IllegalStateException.class)
+        .until(
+            () -> {
+              int val = counter.incrementAndGet();
+              if (val < 3) {
+                throw new IllegalStateException("not ready");
+              }
+              return true;
+            });
+
+    assertTrue(counter.get() >= 3);
+  }
+
+  @Test
+  public void testNonIgnoredExceptionPropagates() {
+    try {
+      Await.await()
+          .atMost(5, TimeUnit.SECONDS)
+          .ignoreException(IllegalStateException.class)
+          .until(
+              () -> {
+                throw new IllegalArgumentException("unexpected");
+              });
+      fail("Should have thrown");
+    } catch (AwaitTimeoutException e) {
+      assertTrue(e.getCause() instanceof IllegalArgumentException);
+    }
+  }
+
+  @Test
+  public void testUntilAsserted() {
+    AtomicInteger value = new AtomicInteger(0);
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+              }
+              value.set(42);
+            })
+        .start();
+
+    Await.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInterval(50, TimeUnit.MILLISECONDS)
+        .untilAsserted(() -> assertEquals(42, value.get()));
+  }
+
+  @Test
+  public void testForever() {
+    AtomicInteger counter = new AtomicInteger(0);
+
+    Await.await()
+        .forever()
+        .pollInterval(10, TimeUnit.MILLISECONDS)
+        .until(() -> counter.incrementAndGet() >= 5);
+
+    assertTrue(counter.get() >= 5);
+  }
+
+  @Test
+  public void testTimeoutMessageIncludesLastException() {
+    try {
+      Await.await()
+          .atMost(200, TimeUnit.MILLISECONDS)
+          .pollInterval(50, TimeUnit.MILLISECONDS)
+          .ignoreExceptions()
+          .until(
+              () -> {
+                throw new RuntimeException("still failing");
+              });
+      fail("Should have thrown");
+    } catch (AwaitTimeoutException e) {
+      assertTrue(e.getCause() instanceof RuntimeException);
+      assertEquals("still failing", e.getCause().getMessage());
+    }
+  }
+}

Reply via email to