This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de6c66ea805760f1550ae1fa348630edd9f17256
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Thu Sep 21 15:30:45 2023 +0800

    [FLINK-33000][sql-gateway] SqlGatewayServiceITCase should utilize 
TestExecutorExtension instead of using a ThreadFactory
---
 .../gateway/service/SqlGatewayServiceITCase.java   | 59 +++++++++++++---------
 1 file changed, 34 insertions(+), 25 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index b74db280f06..3e62cdf90d1 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -66,6 +66,7 @@ import org.apache.flink.table.planner.utils.TableFunc0;
 import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -96,9 +97,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -125,7 +127,7 @@ public class SqlGatewayServiceITCase {
 
     @RegisterExtension
     @Order(1)
-    public static final MiniClusterExtension MINI_CLUSTER =
+    static final MiniClusterExtension MINI_CLUSTER =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(2)
@@ -133,9 +135,19 @@ public class SqlGatewayServiceITCase {
 
     @RegisterExtension
     @Order(2)
-    public static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
+    static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
             new 
SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
 
+    @RegisterExtension
+    @Order(3)
+    static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(
+                    () ->
+                            Executors.newCachedThreadPool(
+                                    new ExecutorThreadFactory(
+                                            "SqlGatewayService Test Pool",
+                                            IgnoreExceptionHandler.INSTANCE)));
+
     private static SessionManagerImpl sessionManager;
     private static SqlGatewayServiceImpl service;
 
@@ -143,9 +155,6 @@ public class SqlGatewayServiceITCase {
             SessionEnvironment.newBuilder()
                     .setSessionEndpointVersion(MockedEndpointVersion.V1)
                     .build();
-    private final ThreadFactory threadFactory =
-            new ExecutorThreadFactory(
-                    "SqlGatewayService Test Pool", 
IgnoreExceptionHandler.INSTANCE);
 
     @BeforeAll
     static void setUp() {
@@ -773,12 +782,10 @@ public class SqlGatewayServiceITCase {
                     service.getSession(sessionHandle)
                             .getOperationManager()
                             .getOperation(operationHandle));
-            threadFactory
-                    .newThread(() -> service.cancelOperation(sessionHandle, 
operationHandle))
-                    .start();
-            threadFactory
-                    .newThread(() -> service.closeOperation(sessionHandle, 
operationHandle))
-                    .start();
+
+            ExecutorService executor = EXECUTOR_EXTENSION.getExecutor();
+            executor.submit(() -> service.cancelOperation(sessionHandle, 
operationHandle));
+            executor.submit(() -> service.closeOperation(sessionHandle, 
operationHandle));
         }
 
         CommonTestUtils.waitUtil(
@@ -800,16 +807,16 @@ public class SqlGatewayServiceITCase {
         int submitThreadsNum = 100;
         CountDownLatch latch = new CountDownLatch(submitThreadsNum);
         for (int i = 0; i < submitThreadsNum; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () -> {
                                 try {
                                     submitDefaultOperation(sessionHandle, () 
-> {});
                                 } finally {
                                     latch.countDown();
                                 }
-                            })
-                    .start();
+                            });
         }
         manager.close();
         latch.await();
@@ -823,8 +830,9 @@ public class SqlGatewayServiceITCase {
         CountDownLatch terminateRunning = new CountDownLatch(1);
         SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
         for (int i = 0; i < count; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () ->
                                     service.submitOperation(
                                             sessionHandle,
@@ -832,8 +840,7 @@ public class SqlGatewayServiceITCase {
                                                 startRunning.countDown();
                                                 terminateRunning.await();
                                                 return getDefaultResultSet();
-                                            }))
-                    .start();
+                                            }));
         }
         startRunning.await();
         service.getSession(sessionHandle).getOperationManager().close();
@@ -1034,7 +1041,8 @@ public class SqlGatewayServiceITCase {
                             schemaFetcherIsRunning.countDown();
                             return 
service.getOperationResultSchema(sessionHandle, operationHandle);
                         });
-        threadFactory.newThread(task).start();
+
+        EXECUTOR_EXTENSION.getExecutor().submit(task);
 
         schemaFetcherIsRunning.await();
         operationIsRunning.countDown();
@@ -1048,16 +1056,17 @@ public class SqlGatewayServiceITCase {
             Condition<String> condition) {
 
         List<RowData> actual = new ArrayList<>();
-        threadFactory
-                .newThread(
+
+        EXECUTOR_EXTENSION
+                .getExecutor()
+                .submit(
                         () -> {
                             try {
                                 cancelOrClose.run();
                             } catch (Exception e) {
                                 // ignore
                             }
-                        })
-                .start();
+                        });
 
         assertThatThrownBy(
                         () -> {

Reply via email to