This is an automated email from the ASF dual-hosted git repository.
jiangmaolin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0731b44c62a Fix the connection leak caused by rollback failure in
Proxy (#35867)
0731b44c62a is described below
commit 0731b44c62a6a013988359458b530b57d855e954
Author: Raigor <[email protected]>
AuthorDate: Thu Jul 3 17:08:45 2025 +0800
Fix the connection leak caused by rollback failure in Proxy (#35867)
* Fix the connection leak caused by rollback failure in Proxy
* Update RELEASE-NOTES.md
---
RELEASE-NOTES.md | 1 +
.../connector/ProxyDatabaseConnectionManager.java | 38 ++++++++++++++++++----
.../netty/FrontendChannelInboundHandler.java | 23 ++++++++++++-
3 files changed, 55 insertions(+), 7 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index e19fd37e292..abe4f860882 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -72,6 +72,7 @@
1. Encrypt: Resolve rewrite issue in nested concat function -
[35815](https://github.com/apache/shardingsphere/pull/35815)
1. JDBC: Fix the issue where cached connections in
DriverDatabaseConnectionManager were not released in time -
[35834](https://github.com/apache/shardingsphere/pull/35834)
1. Proxy: Fix column length for PostgreSQL string binary protocol value -
[35840](https://github.com/apache/shardingsphere/pull/35840)
+1. Proxy: Fix the connection leak caused by rollback failure in Proxy -
[35867](https://github.com/apache/shardingsphere/pull/35867)
### Change Logs
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
index c82dd4c01ab..517df05ef4c 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
@@ -22,6 +22,7 @@ import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager;
@@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* Database connection manager of ShardingSphere-Proxy.
*/
+@Slf4j
@RequiredArgsConstructor
@Getter
public final class ProxyDatabaseConnectionManager implements
DatabaseConnectionManager<Connection> {
@@ -71,6 +73,8 @@ public final class ProxyDatabaseConnectionManager implements
DatabaseConnectionM
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Object closeLock = new Object();
+
@SuppressWarnings("rawtypes")
private final Map<ShardingSphereRule, TransactionHook> transactionHooks =
OrderedSPILoader.getServices(
TransactionHook.class,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
@@ -259,7 +263,7 @@ public final class ProxyDatabaseConnectionManager
implements DatabaseConnectionM
* @throws BackendConnectionException backend connection exception
*/
public void closeExecutionResources() throws BackendConnectionException {
- synchronized (this) {
+ synchronized (closeLock) {
Collection<Exception> result = new
LinkedList<>(closeHandlers(false));
if
(!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext())))
{
result.addAll(closeHandlers(true));
@@ -277,12 +281,16 @@ public final class ProxyDatabaseConnectionManager
implements DatabaseConnectionM
/**
* Close all resources.
+ *
+ * @return exceptions occurred during closing resources
*/
- public void closeAllResources() {
- synchronized (this) {
+ public Collection<SQLException> closeAllResources() {
+ synchronized (closeLock) {
closed.set(true);
- closeHandlers(true);
- closeConnections(true);
+ Collection<SQLException> result = new LinkedList<>();
+ result.addAll(closeHandlers(true));
+ result.addAll(closeConnections(true));
+ return result;
}
}
@@ -326,9 +334,17 @@ public final class ProxyDatabaseConnectionManager
implements DatabaseConnectionM
if (forceRollback &&
connectionSession.getTransactionStatus().isInTransaction()) {
each.rollback();
}
- each.close();
} catch (final SQLException ex) {
result.add(ex);
+ } finally {
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ if (!isClosed(each)) {
+ log.warn("Close connection {} failed.", each, ex);
+ }
+ result.add(ex);
+ }
}
}
cachedConnections.clear();
@@ -339,6 +355,16 @@ public final class ProxyDatabaseConnectionManager
implements DatabaseConnectionM
return result;
}
+ private boolean isClosed(final Connection connection) {
+ try {
+ if (connection.isClosed()) {
+ return true;
+ }
+ } catch (final SQLException ignored) {
+ }
+ return false;
+ }
+
private void resetSessionVariablesIfNecessary(final Collection<Connection>
values, final Collection<SQLException> exceptions) {
if (connectionSession.getRequiredSessionVariableRecorder().isEmpty()
|| values.isEmpty()) {
return;
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d3ea1b3a4b0..4896d220373 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -33,6 +33,8 @@ import
org.apache.shardingsphere.proxy.frontend.executor.UserExecutorGroup;
import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
+import java.sql.SQLException;
+import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -105,11 +107,30 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
private void closeAllResources() {
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId());
- connectionSession.getDatabaseConnectionManager().closeAllResources();
+
processCloseExceptions(connectionSession.getDatabaseConnectionManager().closeAllResources());
Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::disconnect);
databaseProtocolFrontendEngine.release(connectionSession);
}
+ private void processCloseExceptions(final Collection<SQLException>
exceptions) {
+ if (exceptions.isEmpty()) {
+ return;
+ }
+ SQLException ex = new SQLException("");
+ for (SQLException each : exceptions) {
+ ex.setNextException(each);
+ }
+ processException(ex);
+ }
+
+ private void processException(final Exception cause) {
+ if (ExpectedExceptions.isExpected(cause.getClass())) {
+ log.debug("Exception occur: ", cause);
+ } else {
+ log.error("Exception occur: ", cause);
+ }
+ }
+
@Override
public void channelWritabilityChanged(final ChannelHandlerContext context)
{
if (context.channel().isWritable()) {