This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e68b40d5cf9146a5bccadc67d9636f987003c0f Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Thu Sep 21 17:55:51 2023 +0800 [FLINK-33000][sql-gateway] ResultFetcherTest should utilize TestExecutorExtension instead of using a ThreadFactory --- .../gateway/service/result/ResultFetcherTest.java | 51 +++++++++++++++------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java index c5b8bf3c6a3..e95d9fdb5a1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java @@ -31,6 +31,7 @@ import org.apache.flink.table.gateway.api.results.FetchOrientation; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -38,6 +39,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.commons.collections.iterators.IteratorChain; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.math.BigDecimal; import java.sql.Timestamp; @@ -51,7 +53,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -68,8 +71,14 @@ class ResultFetcherTest { private static ResolvedSchema schema; private static List<RowData> data; - private final ThreadFactory threadFactory = - new ExecutorThreadFactory("Result Fetcher Test Pool", IgnoreExceptionHandler.INSTANCE); + @RegisterExtension + private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "Result Fetcher Test Pool", + IgnoreExceptionHandler.INSTANCE))); @BeforeAll static void setUp() { @@ -243,8 +252,9 @@ class ResultFetcherTest { AtomicReference<Boolean> payloadHasData = new AtomicReference<>(true); for (int i = 0; i < fetchThreadNum; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { ResultSet resultSet = fetcher.fetchResults(FetchOrientation.FETCH_NEXT, 1); @@ -253,10 +263,19 @@ class ResultFetcherTest { payloadHasData.set(false); } - rows.put(Thread.currentThread().getId(), resultSet.getData()); + rows.compute( + Thread.currentThread().getId(), + (k, v) -> { + if (v == null) { + return resultSet.getData(); + } else { + v.addAll(resultSet.getData()); + return v; + } + }); + latch.countDown(); - }) - .start(); + }); } latch.await(); @@ -290,8 +309,9 @@ class ResultFetcherTest { long testToken = token; AtomicReference<Boolean> meetEnd = new AtomicReference<>(false); - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { // Should meet EOS in the end. long nextToken = testToken; @@ -304,8 +324,7 @@ class ResultFetcherTest { nextToken = checkNotNull(resultSet.getNextToken()); } meetEnd.set(true); - }) - .start(); + }); CommonTestUtils.waitUtil( meetEnd::get, @@ -436,8 +455,9 @@ class ResultFetcherTest { List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData(); for (int i = 0; i < fetchThreadNum; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE); @@ -445,8 +465,7 @@ class ResultFetcherTest { isEqual.set(false); } latch.countDown(); - }) - .start(); + }); } latch.await();