This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 32b37018185 [FLINK-31351][sql-gateway] Don't stop the stuck thread by force 32b37018185 is described below commit 32b370181853f4129fd237c6a57491863a7e8b8c Author: Shengkai <1059623...@qq.com> AuthorDate: Wed Mar 8 12:12:05 2023 +0800 [FLINK-31351][sql-gateway] Don't stop the stuck thread by force This closes #22127 --- .../service/operation/OperationManager.java | 69 ++++++++++++------ .../gateway/service/utils/SqlCancelException.java | 29 ++++++++ .../service/operation/OperationManagerTest.java | 81 +++++++++++++++------- 3 files changed, 131 insertions(+), 48 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java index 8b239abc771..37a9363c0f2 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java @@ -30,7 +30,11 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.result.NotReadyResult; import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlCancelException; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +48,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -183,13 +188,14 @@ public class OperationManager { /** Closes the {@link OperationManager} and all operations. */ public void close() { stateLock.writeLock().lock(); + Exception closeException = null; try { isRunning = false; - for (Operation operation : submittedOperations.values()) { - operation.close(); - } - submittedOperations.clear(); + IOUtils.closeAll(submittedOperations.values(), Throwable.class); + } catch (Exception e) { + closeException = e; } finally { + submittedOperations.clear(); stateLock.writeLock().unlock(); } // wait all operations closed @@ -201,13 +207,19 @@ public class OperationManager { operationLock.release(); } LOG.debug("Closes the Operation Manager."); + if (closeException != null) { + throw new SqlExecutionException( + "Failed to close the OperationManager.", closeException); + } } // ------------------------------------------------------------------------------------------- /** Operation to manage the execution, results and so on. */ @VisibleForTesting - public class Operation { + public class Operation implements AutoCloseable { + + private static final long WAIT_CLEAN_UP_MILLISECONDS = 5_000; private final OperationHandle operationHandle; @@ -387,7 +399,7 @@ public class OperationManager { private void closeResources() { if (invocation != null && !invocation.isDone()) { invocation.cancel(true); - stopExecutionByForce(invocation); + waitTaskCleanup(invocation); LOG.debug(String.format("Cancel the operation %s.", operationHandle)); } @@ -405,32 +417,23 @@ public class OperationManager { updateState(OperationStatus.ERROR); } - private void stopExecutionByForce(FutureTask<?> invocation) { + private void waitTaskCleanup(FutureTask<?> invocation) { // thread is cleaned async, waiting for a while - Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1)); + Deadline deadline = Deadline.fromNow(Duration.ofMillis(WAIT_CLEAN_UP_MILLISECONDS)); while (deadline.hasTimeLeft()) { Optional<Thread> threadOptional = getThreadInFuture(invocation); if (!threadOptional.isPresent()) { // thread has been cleaned up return; } + // try to release the use of the processor to let the task finish its cleanup. + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); } Optional<Thread> threadOptional = getThreadInFuture(invocation); - if (threadOptional.isPresent()) { - // we have to use Thread.stop() here, because this can - // guarantee thread to be stopped, even there is some - // potential consistent problem, we are fine with it. - Thread thread = threadOptional.get(); - LOG.info( - "\"Future.cancel(true)\" can't cleanup current thread {}, using \"Thread.stop()\" instead.", - thread.getName()); - try { - thread.stop(); - } catch (Throwable e) { - // catch all errors to project the sqlserver - LOG.error("Failed to stop thread: " + thread.getName(), e); - } - } + // Currently, SQL Gateway still doesn't have health reporter to notify the users the + // resource leak or HA to restart the running process. So we just dump the thread and + // throw an exception to notify the users. + threadOptional.ifPresent(this::throwExceptionWithThreadStackTrace); } private Optional<Thread> getThreadInFuture(FutureTask<?> invocation) { @@ -445,6 +448,26 @@ public class OperationManager { return Optional.empty(); } } + + private void throwExceptionWithThreadStackTrace(Thread thread) { + StackTraceElement[] stack = thread.getStackTrace(); + StringBuilder stackTraceStr = new StringBuilder(); + for (StackTraceElement e : stack) { + stackTraceStr.append("\tat ").append(e).append("\n"); + } + + String msg = + String.format( + "Operation '%s' did not react to \"Future.cancel(true)\" and " + + "is stuck for %s seconds in method.\n" + + "Thread name: %s, thread state: %s, thread stacktrace:\n%s", + operationHandle, + WAIT_CLEAN_UP_MILLISECONDS / 1000, + thread.getName(), + thread.getState(), + stackTraceStr); + throw new SqlCancelException(msg); + } } // ------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java new file mode 100644 index 00000000000..2e92a90a9b9 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.utils; + +/** Thrown to trigger a canceling of the executing operation. */ +public class SqlCancelException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public SqlCancelException(String msg) { + super(msg); + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java index e4d95f1bae1..6ccdd0bd2b0 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java @@ -32,11 +32,13 @@ import org.apache.flink.table.gateway.api.results.ResultSetImpl; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.api.utils.ThreadUtils; import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; +import org.apache.flink.table.gateway.service.utils.SqlCancelException; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -52,7 +54,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link OperationManager}. */ -public class OperationManagerTest { +class OperationManagerTest { private static final ExecutorService EXECUTOR_SERVICE = ThreadUtils.newThreadPool(5, 500, 60_0000, "operation-manager-test"); @@ -64,8 +66,8 @@ public class OperationManagerTest { new ExecutorThreadFactory( "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE); - @BeforeAll - public static void setUp() { + @BeforeEach + void setUp() { operationManager = new OperationManager(EXECUTOR_SERVICE); defaultResultSet = new ResultSetImpl( @@ -79,14 +81,18 @@ public class OperationManagerTest { ResultKind.SUCCESS_WITH_CONTENT); } + @AfterEach + void cleanEach() { + operationManager.close(); + } + @AfterAll - public static void cleanUp() { + static void cleanUp() { EXECUTOR_SERVICE.shutdown(); - operationManager.close(); } @Test - public void testRunOperationAsynchronously() throws Exception { + void testRunOperationAsynchronously() throws Exception { OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet); assertThat(operationManager.getOperationInfo(operationHandle).getStatus()) @@ -100,7 +106,7 @@ public class OperationManagerTest { } @Test - public void testRunOperationSynchronously() throws Exception { + void testRunOperationSynchronously() throws Exception { OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet); operationManager.awaitOperationTermination(operationHandle); @@ -112,7 +118,7 @@ public class OperationManagerTest { } @Test - public void testCancelOperation() throws Exception { + void testCancelOperation() throws Exception { CountDownLatch endRunningLatch = new CountDownLatch(1); OperationHandle operationHandle = operationManager.submitOperation( @@ -129,33 +135,58 @@ public class OperationManagerTest { } @Test - public void testCancelOperationByForce() throws Exception { - AtomicReference<Throwable> exception = new AtomicReference<>(null); + void testCancelUninterruptedOperation() throws Exception { + AtomicReference<Boolean> isRunning = new AtomicReference<>(false); OperationHandle operationHandle = operationManager.submitOperation( () -> { - try { - // mock cpu busy task that doesn't interrupt system call - while (true) {} - } catch (Throwable t) { - exception.set(t); - throw t; + // mock cpu busy task that doesn't interrupt system call + while (true) { + isRunning.compareAndSet(false, true); } }); - - threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start(); - operationManager.awaitOperationTermination(operationHandle); + CommonTestUtils.waitUtil( + isRunning::get, Duration.ofSeconds(10), "Failed to start up the task."); + assertThatThrownBy(() -> operationManager.cancelOperation(operationHandle)) + .satisfies( + FlinkAssertions.anyCauseMatches( + SqlCancelException.class, + String.format( + "Operation '%s' did not react to \"Future.cancel(true)\" and " + + "is stuck for %s seconds in method.\n", + operationHandle, 5))); assertThat(operationManager.getOperationInfo(operationHandle).getStatus()) .isEqualTo(OperationStatus.CANCELED); + } + + @Test + void testCloseUninterruptedOperation() throws Exception { + AtomicReference<Boolean> isRunning = new AtomicReference<>(false); + for (int i = 0; i < 10; i++) { + threadFactory + .newThread( + () -> { + operationManager.submitOperation( + () -> { + // mock cpu busy task that doesn't interrupt system call + while (true) { + isRunning.compareAndSet(false, true); + } + }); + }) + .start(); + } CommonTestUtils.waitUtil( - () -> exception.get() != null, - Duration.ofSeconds(10), - "Failed to kill the task with infinite loop."); + isRunning::get, Duration.ofSeconds(10), "Failed to start up the task."); + + assertThatThrownBy(() -> operationManager.close()) + .satisfies(FlinkAssertions.anyCauseMatches(SqlCancelException.class)); + assertThat(operationManager.getOperationCount()).isEqualTo(0); } @Test - public void testCloseOperation() throws Exception { + void testCloseOperation() throws Exception { CountDownLatch endRunningLatch = new CountDownLatch(1); OperationHandle operationHandle = operationManager.submitOperation( @@ -177,7 +208,7 @@ public class OperationManagerTest { } @Test - public void testRunOperationSynchronouslyWithError() { + void testRunOperationSynchronouslyWithError() { OperationHandle operationHandle = operationManager.submitOperation( () -> {