This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4d413dffeae0fb4076c4305396e1c32629395e74 Author: Jiangjie (Becket) Qin <jiangjie...@alibaba-inc.com> AuthorDate: Sun Oct 25 20:02:35 2020 +0800 [hotfix] Add a waitUntil() method to the CommonTestUtils. --- .../flink/core/testutils/CommonTestUtils.java | 29 ++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java index 2eb4506..6738412 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -29,8 +29,11 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Field; +import java.time.Duration; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -181,4 +184,30 @@ public class CommonTestUtils { assertThat(e.getMessage(), containsString(msg)); } } + + /** + * Wait util the given condition is met or timeout. + * + * @param condition the condition to wait for. + * @param timeout the maximum time to wait for the condition to become true. + * @param errorMsg the error message to include in the <code>TimeoutException</code> + * if the condition was not met before timeout. + * @throws TimeoutException if the condition is not met before timeout. + * @throws InterruptedException if the thread is interrupted. + */ + @SuppressWarnings("BusyWait") + public static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg) + throws TimeoutException, InterruptedException { + long timeoutMs = timeout.toMillis(); + if (timeoutMs <= 0) { + throw new IllegalArgumentException("The timeout must be positive."); + } + long startingTime = System.currentTimeMillis(); + while (!condition.get() && System.currentTimeMillis() - startingTime < timeoutMs) { + Thread.sleep(1); + } + if (!condition.get()) { + throw new TimeoutException(errorMsg); + } + } }