This is an automated email from the ASF dual-hosted git repository.
morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c48cde7877b [fix](connection) Prevent timeout checker from stopping
after an exception (#65040)
c48cde7877b is described below
commit c48cde7877be3e5672fdcc57e3eea44b6faa6602
Author: feiniaofeiafei <[email protected]>
AuthorDate: Wed Jul 1 11:22:07 2026 +0800
[fix](connection) Prevent timeout checker from stopping after an exception
(#65040)
Problem Summary:
The connection timeout checker is scheduled with `scheduleAtFixedRate`.
If an unchecked exception escapes from the task,
`ScheduledThreadPoolExecutor` suppresses all subsequent executions. As a
result, expired connections may no longer be cleaned up.
This PR:
- Isolates exceptions from each `ConnectContext`, so one broken
connection does not prevent other connections from being checked.
- Adds an outer exception boundary to keep the periodic timeout checker
alive.
- Logs the stack trace together with the connection ID and user for
diagnosis.
- Adds a unit test verifying that timeout checking continues after one
context throws an exception.
This applies to both MySQL and Arrow Flight SQL connection pools.
---
.../java/org/apache/doris/qe/ConnectPoolMgr.java | 7 ++++-
.../java/org/apache/doris/qe/ConnectScheduler.java | 10 ++++--
.../org/apache/doris/qe/ConnectSchedulerTest.java | 36 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
index b64bcdd9284..643908e52c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
@@ -53,7 +53,12 @@ public class ConnectPoolMgr {
public void timeoutChecker(long now) {
for (ConnectContext connectContext : connectionMap.values()) {
- connectContext.checkTimeout(now);
+ try {
+ connectContext.checkTimeout(now);
+ } catch (Throwable t) {
+ LOG.warn("failed to check timeout for connection,
connectionId: {}, user: {}",
+ connectContext.getConnectionId(),
connectContext.getQualifiedUser(), t);
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 32ea481fa9f..b22e89cf209 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -160,9 +160,13 @@ public class ConnectScheduler {
private class TimeoutChecker extends TimerTask {
@Override
public void run() {
- long now = System.currentTimeMillis();
- connectPoolMgr.timeoutChecker(now);
- flightSqlConnectPoolMgr.timeoutChecker(now);
+ try {
+ long now = System.currentTimeMillis();
+ connectPoolMgr.timeoutChecker(now);
+ flightSqlConnectPoolMgr.timeoutChecker(now);
+ } catch (Throwable t) {
+ LOG.warn("failed to check connection timeout", t);
+ }
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java
index 71a956bfbe0..21d126880f7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ConnectSchedulerTest {
@@ -98,4 +99,39 @@ public class ConnectSchedulerTest {
ConnectContext context = new ConnectContext();
Assert.assertTrue(scheduler.submit(context));
}
+
+ @Test
+ public void testTimeoutCheckerContinuesAfterContextException() {
+ ConnectPoolMgr connectPoolMgr = new ConnectPoolMgr(10);
+ ThrowingConnectContext throwingContext = new ThrowingConnectContext();
+ CountingConnectContext countingContext = new CountingConnectContext();
+ throwingContext.setConnectionId(1);
+ countingContext.setConnectionId(2);
+
connectPoolMgr.getConnectionMap().put(throwingContext.getConnectionId(),
throwingContext);
+
connectPoolMgr.getConnectionMap().put(countingContext.getConnectionId(),
countingContext);
+
+ connectPoolMgr.timeoutChecker(System.currentTimeMillis());
+
+ Assert.assertEquals(1, throwingContext.checkCount.get());
+ Assert.assertEquals(1, countingContext.checkCount.get());
+ }
+
+ private static class ThrowingConnectContext extends ConnectContext {
+ private final AtomicInteger checkCount = new AtomicInteger(0);
+
+ @Override
+ public void checkTimeout(long now) {
+ checkCount.incrementAndGet();
+ throw new RuntimeException("mock check timeout exception");
+ }
+ }
+
+ private static class CountingConnectContext extends ConnectContext {
+ private final AtomicInteger checkCount = new AtomicInteger(0);
+
+ @Override
+ public void checkTimeout(long now) {
+ checkCount.incrementAndGet();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]