This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 32b37018185 [FLINK-31351][sql-gateway] Don't stop the stuck thread by 
force
32b37018185 is described below

commit 32b370181853f4129fd237c6a57491863a7e8b8c
Author: Shengkai <1059623...@qq.com>
AuthorDate: Wed Mar 8 12:12:05 2023 +0800

    [FLINK-31351][sql-gateway] Don't stop the stuck thread by force
    
    This closes #22127
---
 .../service/operation/OperationManager.java        | 69 ++++++++++++------
 .../gateway/service/utils/SqlCancelException.java  | 29 ++++++++
 .../service/operation/OperationManagerTest.java    | 81 +++++++++++++++-------
 3 files changed, 131 insertions(+), 48 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 8b239abc771..37a9363c0f2 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -30,7 +30,11 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.result.NotReadyResult;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlCancelException;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.IOUtils;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +48,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -183,13 +188,14 @@ public class OperationManager {
     /** Closes the {@link OperationManager} and all operations. */
     public void close() {
         stateLock.writeLock().lock();
+        Exception closeException = null;
         try {
             isRunning = false;
-            for (Operation operation : submittedOperations.values()) {
-                operation.close();
-            }
-            submittedOperations.clear();
+            IOUtils.closeAll(submittedOperations.values(), Throwable.class);
+        } catch (Exception e) {
+            closeException = e;
         } finally {
+            submittedOperations.clear();
             stateLock.writeLock().unlock();
         }
         // wait all operations closed
@@ -201,13 +207,19 @@ public class OperationManager {
             operationLock.release();
         }
         LOG.debug("Closes the Operation Manager.");
+        if (closeException != null) {
+            throw new SqlExecutionException(
+                    "Failed to close the OperationManager.", closeException);
+        }
     }
 
     // 
-------------------------------------------------------------------------------------------
 
     /** Operation to manage the execution, results and so on. */
     @VisibleForTesting
-    public class Operation {
+    public class Operation implements AutoCloseable {
+
+        private static final long WAIT_CLEAN_UP_MILLISECONDS = 5_000;
 
         private final OperationHandle operationHandle;
 
@@ -387,7 +399,7 @@ public class OperationManager {
         private void closeResources() {
             if (invocation != null && !invocation.isDone()) {
                 invocation.cancel(true);
-                stopExecutionByForce(invocation);
+                waitTaskCleanup(invocation);
                 LOG.debug(String.format("Cancel the operation %s.", 
operationHandle));
             }
 
@@ -405,32 +417,23 @@ public class OperationManager {
             updateState(OperationStatus.ERROR);
         }
 
-        private void stopExecutionByForce(FutureTask<?> invocation) {
+        private void waitTaskCleanup(FutureTask<?> invocation) {
             // thread is cleaned async, waiting for a while
-            Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+            Deadline deadline = 
Deadline.fromNow(Duration.ofMillis(WAIT_CLEAN_UP_MILLISECONDS));
             while (deadline.hasTimeLeft()) {
                 Optional<Thread> threadOptional = 
getThreadInFuture(invocation);
                 if (!threadOptional.isPresent()) {
                     // thread has been cleaned up
                     return;
                 }
+                // try to release the use of the processor to let the task 
finish its cleanup.
+                Uninterruptibles.sleepUninterruptibly(1, 
TimeUnit.MILLISECONDS);
             }
             Optional<Thread> threadOptional = getThreadInFuture(invocation);
-            if (threadOptional.isPresent()) {
-                // we have to use Thread.stop() here, because this can
-                // guarantee thread to be stopped, even there is some
-                // potential consistent problem, we are fine with it.
-                Thread thread = threadOptional.get();
-                LOG.info(
-                        "\"Future.cancel(true)\" can't cleanup current thread 
{}, using \"Thread.stop()\" instead.",
-                        thread.getName());
-                try {
-                    thread.stop();
-                } catch (Throwable e) {
-                    // catch all errors to project the sqlserver
-                    LOG.error("Failed to stop thread: " + thread.getName(), e);
-                }
-            }
+            // Currently, SQL Gateway still doesn't have health reporter to 
notify the users the
+            // resource leak or HA to restart the running process. So we just 
dump the thread and
+            // throw an exception to notify the users.
+            threadOptional.ifPresent(this::throwExceptionWithThreadStackTrace);
         }
 
         private Optional<Thread> getThreadInFuture(FutureTask<?> invocation) {
@@ -445,6 +448,26 @@ public class OperationManager {
                 return Optional.empty();
             }
         }
+
+        private void throwExceptionWithThreadStackTrace(Thread thread) {
+            StackTraceElement[] stack = thread.getStackTrace();
+            StringBuilder stackTraceStr = new StringBuilder();
+            for (StackTraceElement e : stack) {
+                stackTraceStr.append("\tat ").append(e).append("\n");
+            }
+
+            String msg =
+                    String.format(
+                            "Operation '%s' did not react to 
\"Future.cancel(true)\" and "
+                                    + "is stuck for %s seconds in method.\n"
+                                    + "Thread name: %s, thread state: %s, 
thread stacktrace:\n%s",
+                            operationHandle,
+                            WAIT_CLEAN_UP_MILLISECONDS / 1000,
+                            thread.getName(),
+                            thread.getState(),
+                            stackTraceStr);
+            throw new SqlCancelException(msg);
+        }
     }
 
     // 
-------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java
new file mode 100644
index 00000000000..2e92a90a9b9
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/** Thrown to trigger a canceling of the executing operation. */
+public class SqlCancelException extends RuntimeException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SqlCancelException(String msg) {
+        super(msg);
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
index e4d95f1bae1..6ccdd0bd2b0 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
@@ -32,11 +32,13 @@ import 
org.apache.flink.table.gateway.api.results.ResultSetImpl;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.api.utils.ThreadUtils;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
+import org.apache.flink.table.gateway.service.utils.SqlCancelException;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -52,7 +54,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link OperationManager}. */
-public class OperationManagerTest {
+class OperationManagerTest {
 
     private static final ExecutorService EXECUTOR_SERVICE =
             ThreadUtils.newThreadPool(5, 500, 60_0000, 
"operation-manager-test");
@@ -64,8 +66,8 @@ public class OperationManagerTest {
             new ExecutorThreadFactory(
                     "SqlGatewayService Test Pool", 
IgnoreExceptionHandler.INSTANCE);
 
-    @BeforeAll
-    public static void setUp() {
+    @BeforeEach
+    void setUp() {
         operationManager = new OperationManager(EXECUTOR_SERVICE);
         defaultResultSet =
                 new ResultSetImpl(
@@ -79,14 +81,18 @@ public class OperationManagerTest {
                         ResultKind.SUCCESS_WITH_CONTENT);
     }
 
+    @AfterEach
+    void cleanEach() {
+        operationManager.close();
+    }
+
     @AfterAll
-    public static void cleanUp() {
+    static void cleanUp() {
         EXECUTOR_SERVICE.shutdown();
-        operationManager.close();
     }
 
     @Test
-    public void testRunOperationAsynchronously() throws Exception {
+    void testRunOperationAsynchronously() throws Exception {
         OperationHandle operationHandle = operationManager.submitOperation(() 
-> defaultResultSet);
 
         
assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
@@ -100,7 +106,7 @@ public class OperationManagerTest {
     }
 
     @Test
-    public void testRunOperationSynchronously() throws Exception {
+    void testRunOperationSynchronously() throws Exception {
         OperationHandle operationHandle = operationManager.submitOperation(() 
-> defaultResultSet);
         operationManager.awaitOperationTermination(operationHandle);
 
@@ -112,7 +118,7 @@ public class OperationManagerTest {
     }
 
     @Test
-    public void testCancelOperation() throws Exception {
+    void testCancelOperation() throws Exception {
         CountDownLatch endRunningLatch = new CountDownLatch(1);
         OperationHandle operationHandle =
                 operationManager.submitOperation(
@@ -129,33 +135,58 @@ public class OperationManagerTest {
     }
 
     @Test
-    public void testCancelOperationByForce() throws Exception {
-        AtomicReference<Throwable> exception = new AtomicReference<>(null);
+    void testCancelUninterruptedOperation() throws Exception {
+        AtomicReference<Boolean> isRunning = new AtomicReference<>(false);
         OperationHandle operationHandle =
                 operationManager.submitOperation(
                         () -> {
-                            try {
-                                // mock cpu busy task that doesn't interrupt 
system call
-                                while (true) {}
-                            } catch (Throwable t) {
-                                exception.set(t);
-                                throw t;
+                            // mock cpu busy task that doesn't interrupt 
system call
+                            while (true) {
+                                isRunning.compareAndSet(false, true);
                             }
                         });
-
-        threadFactory.newThread(() -> 
operationManager.cancelOperation(operationHandle)).start();
-        operationManager.awaitOperationTermination(operationHandle);
+        CommonTestUtils.waitUtil(
+                isRunning::get, Duration.ofSeconds(10), "Failed to start up 
the task.");
+        assertThatThrownBy(() -> 
operationManager.cancelOperation(operationHandle))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlCancelException.class,
+                                String.format(
+                                        "Operation '%s' did not react to 
\"Future.cancel(true)\" and "
+                                                + "is stuck for %s seconds in 
method.\n",
+                                        operationHandle, 5)));
 
         
assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
                 .isEqualTo(OperationStatus.CANCELED);
+    }
+
+    @Test
+    void testCloseUninterruptedOperation() throws Exception {
+        AtomicReference<Boolean> isRunning = new AtomicReference<>(false);
+        for (int i = 0; i < 10; i++) {
+            threadFactory
+                    .newThread(
+                            () -> {
+                                operationManager.submitOperation(
+                                        () -> {
+                                            // mock cpu busy task that doesn't 
interrupt system call
+                                            while (true) {
+                                                isRunning.compareAndSet(false, 
true);
+                                            }
+                                        });
+                            })
+                    .start();
+        }
         CommonTestUtils.waitUtil(
-                () -> exception.get() != null,
-                Duration.ofSeconds(10),
-                "Failed to kill the task with infinite loop.");
+                isRunning::get, Duration.ofSeconds(10), "Failed to start up 
the task.");
+
+        assertThatThrownBy(() -> operationManager.close())
+                
.satisfies(FlinkAssertions.anyCauseMatches(SqlCancelException.class));
+        assertThat(operationManager.getOperationCount()).isEqualTo(0);
     }
 
     @Test
-    public void testCloseOperation() throws Exception {
+    void testCloseOperation() throws Exception {
         CountDownLatch endRunningLatch = new CountDownLatch(1);
         OperationHandle operationHandle =
                 operationManager.submitOperation(
@@ -177,7 +208,7 @@ public class OperationManagerTest {
     }
 
     @Test
-    public void testRunOperationSynchronouslyWithError() {
+    void testRunOperationSynchronouslyWithError() {
         OperationHandle operationHandle =
                 operationManager.submitOperation(
                         () -> {

Reply via email to