This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e68b40d5cf9146a5bccadc67d9636f987003c0f
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Thu Sep 21 17:55:51 2023 +0800

    [FLINK-33000][sql-gateway] ResultFetcherTest should utilize 
TestExecutorExtension instead of using a ThreadFactory
---
 .../gateway/service/result/ResultFetcherTest.java  | 51 +++++++++++++++-------
 1 file changed, 35 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index c5b8bf3c6a3..e95d9fdb5a1 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -38,6 +39,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.commons.collections.iterators.IteratorChain;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.math.BigDecimal;
 import java.sql.Timestamp;
@@ -51,7 +53,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -68,8 +71,14 @@ class ResultFetcherTest {
     private static ResolvedSchema schema;
     private static List<RowData> data;
 
-    private final ThreadFactory threadFactory =
-            new ExecutorThreadFactory("Result Fetcher Test Pool", 
IgnoreExceptionHandler.INSTANCE);
+    @RegisterExtension
+    private static final TestExecutorExtension<ExecutorService> 
EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(
+                    () ->
+                            Executors.newCachedThreadPool(
+                                    new ExecutorThreadFactory(
+                                            "Result Fetcher Test Pool",
+                                            IgnoreExceptionHandler.INSTANCE)));
 
     @BeforeAll
     static void setUp() {
@@ -243,8 +252,9 @@ class ResultFetcherTest {
 
         AtomicReference<Boolean> payloadHasData = new AtomicReference<>(true);
         for (int i = 0; i < fetchThreadNum; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () -> {
                                 ResultSet resultSet =
                                         
fetcher.fetchResults(FetchOrientation.FETCH_NEXT, 1);
@@ -253,10 +263,19 @@ class ResultFetcherTest {
                                     payloadHasData.set(false);
                                 }
 
-                                rows.put(Thread.currentThread().getId(), 
resultSet.getData());
+                                rows.compute(
+                                        Thread.currentThread().getId(),
+                                        (k, v) -> {
+                                            if (v == null) {
+                                                return resultSet.getData();
+                                            } else {
+                                                v.addAll(resultSet.getData());
+                                                return v;
+                                            }
+                                        });
+
                                 latch.countDown();
-                            })
-                    .start();
+                            });
         }
 
         latch.await();
@@ -290,8 +309,9 @@ class ResultFetcherTest {
 
         long testToken = token;
         AtomicReference<Boolean> meetEnd = new AtomicReference<>(false);
-        threadFactory
-                .newThread(
+        EXECUTOR_EXTENSION
+                .getExecutor()
+                .submit(
                         () -> {
                             // Should meet EOS in the end.
                             long nextToken = testToken;
@@ -304,8 +324,7 @@ class ResultFetcherTest {
                                 nextToken = 
checkNotNull(resultSet.getNextToken());
                             }
                             meetEnd.set(true);
-                        })
-                .start();
+                        });
 
         CommonTestUtils.waitUtil(
                 meetEnd::get,
@@ -436,8 +455,9 @@ class ResultFetcherTest {
 
         List<RowData> firstFetch = fetcher.fetchResults(0, 
Integer.MAX_VALUE).getData();
         for (int i = 0; i < fetchThreadNum; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () -> {
                                 ResultSet resultSet = fetcher.fetchResults(0, 
Integer.MAX_VALUE);
 
@@ -445,8 +465,7 @@ class ResultFetcherTest {
                                     isEqual.set(false);
                                 }
                                 latch.countDown();
-                            })
-                    .start();
+                            });
         }
 
         latch.await();

Reply via email to