This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit de6c66ea805760f1550ae1fa348630edd9f17256 Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Thu Sep 21 15:30:45 2023 +0800 [FLINK-33000][sql-gateway] SqlGatewayServiceITCase should utilize TestExecutorExtension instead of using a ThreadFactory --- .../gateway/service/SqlGatewayServiceITCase.java | 59 +++++++++++++--------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index b74db280f06..3e62cdf90d1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -66,6 +66,7 @@ import org.apache.flink.table.planner.utils.TableFunc0; import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.test.util.TestUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.UserClassLoaderJarTestUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -96,9 +97,10 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -125,7 +127,7 @@ public class SqlGatewayServiceITCase { @RegisterExtension @Order(1) - public static final MiniClusterExtension MINI_CLUSTER = + static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(2) @@ -133,9 +135,19 @@ public class SqlGatewayServiceITCase { @RegisterExtension @Order(2) - public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration); + @RegisterExtension + @Order(3) + static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "SqlGatewayService Test Pool", + IgnoreExceptionHandler.INSTANCE))); + private static SessionManagerImpl sessionManager; private static SqlGatewayServiceImpl service; @@ -143,9 +155,6 @@ public class SqlGatewayServiceITCase { SessionEnvironment.newBuilder() .setSessionEndpointVersion(MockedEndpointVersion.V1) .build(); - private final ThreadFactory threadFactory = - new ExecutorThreadFactory( - "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE); @BeforeAll static void setUp() { @@ -773,12 +782,10 @@ public class SqlGatewayServiceITCase { service.getSession(sessionHandle) .getOperationManager() .getOperation(operationHandle)); - threadFactory - .newThread(() -> service.cancelOperation(sessionHandle, operationHandle)) - .start(); - threadFactory - .newThread(() -> service.closeOperation(sessionHandle, operationHandle)) - .start(); + + ExecutorService executor = EXECUTOR_EXTENSION.getExecutor(); + executor.submit(() -> service.cancelOperation(sessionHandle, operationHandle)); + executor.submit(() -> service.closeOperation(sessionHandle, operationHandle)); } CommonTestUtils.waitUtil( @@ -800,16 +807,16 @@ public class SqlGatewayServiceITCase { int submitThreadsNum = 100; CountDownLatch latch = new CountDownLatch(submitThreadsNum); for (int i = 0; i < submitThreadsNum; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { try { submitDefaultOperation(sessionHandle, () -> {}); } finally { latch.countDown(); } - }) - .start(); + }); } manager.close(); latch.await(); @@ -823,8 +830,9 @@ public class SqlGatewayServiceITCase { CountDownLatch terminateRunning = new CountDownLatch(1); SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); for (int i = 0; i < count; i++) { - threadFactory - .newThread( + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> service.submitOperation( sessionHandle, @@ -832,8 +840,7 @@ public class SqlGatewayServiceITCase { startRunning.countDown(); terminateRunning.await(); return getDefaultResultSet(); - })) - .start(); + })); } startRunning.await(); service.getSession(sessionHandle).getOperationManager().close(); @@ -1034,7 +1041,8 @@ public class SqlGatewayServiceITCase { schemaFetcherIsRunning.countDown(); return service.getOperationResultSchema(sessionHandle, operationHandle); }); - threadFactory.newThread(task).start(); + + EXECUTOR_EXTENSION.getExecutor().submit(task); schemaFetcherIsRunning.await(); operationIsRunning.countDown(); @@ -1048,16 +1056,17 @@ public class SqlGatewayServiceITCase { Condition<String> condition) { List<RowData> actual = new ArrayList<>(); - threadFactory - .newThread( + + EXECUTOR_EXTENSION + .getExecutor() + .submit( () -> { try { cancelOrClose.run(); } catch (Exception e) { // ignore } - }) - .start(); + }); assertThatThrownBy( () -> {