This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7e3c6de Revise and fix resources leak cause by #13179 (#13248)
7e3c6de is described below
commit 7e3c6de07cc60eea1ae4319c73b8dc3e01859875
Author: 吴伟杰 <[email protected]>
AuthorDate: Tue Oct 26 18:17:38 2021 +0800
Revise and fix resources leak cause by #13179 (#13248)
* Revise #13179
* Fix resources leak
* Add todo
* Add todo
* Fix tests
---
.../properties/ConfigurationPropertyKey.java | 4 +-
.../distsql/ral/common/enums/VariableEnum.java | 4 +-
.../src/main/resources/conf/server.yaml | 2 +-
.../connection/ConnectionLimitContext.java | 43 +++++++++++-----------
...va => FrontendTooManyConnectionsException.java} | 11 +-----
.../FrontendChannelLimitationInboundHandler.java | 31 ++++++++--------
.../mysql/command/MySQLCommandExecuteEngine.java | 4 +-
.../frontend/mysql/err/MySQLErrPacketFactory.java | 15 ++++----
.../command/OpenGaussCommandExecuteEngine.java | 4 +-
.../command/PostgreSQLCommandExecuteEngine.java | 4 +-
.../frontend/command/CommandExecuteEngine.java | 4 +-
11 files changed, 60 insertions(+), 66 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/properties/ConfigurationPropertyKey.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/properties/ConfigurationPropertyKey.java
index 23dda3e..da95591 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/properties/ConfigurationPropertyKey.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/properties/ConfigurationPropertyKey.java
@@ -116,9 +116,9 @@ public enum ConfigurationPropertyKey implements
TypedPropertyKey {
PROXY_BACKEND_EXECUTOR_SUITABLE("proxy-backend-executor-suitable", "OLAP",
String.class),
/**
- * Proxy connection num limit. less than 0 or equal 0 means no limit.
+ * Less than or equal to 0 means no limitation.
*/
- PROXY_FRONTEND_CONNECTION_LIMIT("proxy-frontend-connection-limit", "-1",
int.class),
+ PROXY_FRONTEND_MAX_CONNECTIONS("proxy-frontend-max-connections", "0",
int.class),
/**
* Whether enable sql federation.
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/enums/VariableEnum.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/enums/VariableEnum.java
index 0ee5e7f..0a1b90b 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/enums/VariableEnum.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/enums/VariableEnum.java
@@ -58,7 +58,7 @@ public enum VariableEnum {
PROXY_BACKEND_EXECUTOR_SUITABLE,
- PROXY_FRONTEND_CONNECTION_LIMIT,
+ PROXY_FRONTEND_MAX_CONNECTIONS,
/**
* Other variables.
@@ -102,6 +102,6 @@ public enum VariableEnum {
VariableEnum.SQL_COMMENT_PARSE_ENABLED,
VariableEnum.PROXY_FRONTEND_EXECUTOR_SIZE,
VariableEnum.PROXY_BACKEND_EXECUTOR_SUITABLE,
- VariableEnum.PROXY_FRONTEND_CONNECTION_LIMIT);
+ VariableEnum.PROXY_FRONTEND_MAX_CONNECTIONS);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/server.yaml
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/server.yaml
index a16eb5f..6f00eff 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/server.yaml
+++
b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/server.yaml
@@ -74,5 +74,5 @@
# # Available options of proxy backend executor suitable: OLAP(default),
OLTP. The OLTP option may reduce time cost of writing packets to client, but it
may increase the latency of SQL execution
# # if client connections are more than proxy-frontend-netty-executor-size,
especially executing slow SQL.
# proxy-backend-executor-suitable: OLAP
-# proxy-frontend-connection-limit: -1 # Proxy connection limit, less than 0
or equal 0 means no limit.
+# proxy-frontend-max-connections: 0 # Less than or equal to 0 means no
limitation.
# sql-federation-enabled: false
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionLimitContext.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionLimitContext.java
index bc5779d..12c68b35 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionLimitContext.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionLimitContext.java
@@ -17,13 +17,12 @@
package org.apache.shardingsphere.proxy.frontend.connection;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Connection limit context.
@@ -32,7 +31,7 @@ import lombok.NoArgsConstructor;
public final class ConnectionLimitContext {
private static final ConnectionLimitContext INSTANCE = new
ConnectionLimitContext();
-
+
private final AtomicInteger activeConnections = new AtomicInteger();
/**
@@ -46,33 +45,35 @@ public final class ConnectionLimitContext {
/**
* Channel active state.
+ *
* @return Whether the connection can be established.
*/
- public boolean connect() {
- if (this.getConnectionLimit() <= 0) {
- return true;
- }
- if (this.activeConnections.incrementAndGet() <=
this.getConnectionLimit()) {
- return true;
- }
- return false;
+ public boolean connectionAllowed() {
+ return activeConnections.incrementAndGet() <= getMaxConnections() ||
!limitsMaxConnections();
}
/**
* Channel inactive state.
*/
- public void disconnect() {
- if (this.getConnectionLimit() <= 0) {
- return;
- }
- this.activeConnections.decrementAndGet();
+ public void connectionInactive() {
+ activeConnections.decrementAndGet();
+ }
+
+ /**
+ * Check if limits number of frontend connections.
+ *
+ * @return limits max connections
+ */
+ public boolean limitsMaxConnections() {
+ return getMaxConnections() > 0;
}
/**
- * Connection limit size.
+ * Get connection limit size.
+ *
* @return limit size.
*/
- public int getConnectionLimit() {
- return
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_CONNECTION_LIMIT);
+ public int getMaxConnections() {
+ return
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_MAX_CONNECTIONS);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/FrontendConnectionLimitException.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/FrontendTooManyConnectionsException.java
similarity index 79%
rename from
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/FrontendConnectionLimitException.java
rename to
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/FrontendTooManyConnectionsException.java
index ff160e6..cf2cc66 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/FrontendConnectionLimitException.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/FrontendTooManyConnectionsException.java
@@ -17,17 +17,10 @@
package org.apache.shardingsphere.proxy.frontend.exception;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
/**
- * Frontend connection limit exception.
+ * Frontend too many connections exception.
*/
-@RequiredArgsConstructor
-@Getter
-public final class FrontendConnectionLimitException extends FrontendException {
+public final class FrontendTooManyConnectionsException extends
FrontendException {
private static final long serialVersionUID = -4397915988239251541L;
-
- private final String message;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelLimitationInboundHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelLimitationInboundHandler.java
index 758faef..cef2735 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelLimitationInboundHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelLimitationInboundHandler.java
@@ -17,14 +17,13 @@
package org.apache.shardingsphere.proxy.frontend.netty;
-import
org.apache.shardingsphere.proxy.frontend.connection.ConnectionLimitContext;
-import
org.apache.shardingsphere.proxy.frontend.exception.FrontendConnectionLimitException;
-import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
-
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.proxy.frontend.connection.ConnectionLimitContext;
+import
org.apache.shardingsphere.proxy.frontend.exception.FrontendTooManyConnectionsException;
+import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
/**
* Frontend channel limitation inbound handler.
@@ -32,22 +31,24 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class FrontendChannelLimitationInboundHandler extends
ChannelInboundHandlerAdapter {
-
+
private final DatabaseProtocolFrontendEngine
databaseProtocolFrontendEngine;
-
+
@Override
- public void channelActive(final ChannelHandlerContext ctx) throws
Exception {
- if (ConnectionLimitContext.getInstance().connect()) {
+ public void channelActive(final ChannelHandlerContext ctx) {
+ if (ConnectionLimitContext.getInstance().connectionAllowed()) {
ctx.fireChannelActive();
- } else {
- log.debug("Close channel {}, The server connections greater than
{}", ctx.channel().remoteAddress(),
ConnectionLimitContext.getInstance().getConnectionLimit());
-
ctx.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new
FrontendConnectionLimitException("The number of connections exceeds the
limit")));
- ctx.close();
+ return;
}
+ log.debug("Closing channel {} due to the number of server connections
has reached max connections {}", ctx.channel().remoteAddress(),
ConnectionLimitContext.getInstance().getMaxConnections());
+ // TODO This is not how actual databases does and should be refactored.
+
ctx.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new
FrontendTooManyConnectionsException()));
+ ctx.close();
}
-
+
@Override
- public void channelInactive(final ChannelHandlerContext ctx) throws
Exception {
- ConnectionLimitContext.getInstance().disconnect();
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ ctx.fireChannelInactive();
+ ConnectionLimitContext.getInstance().connectionInactive();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index c3f5aa3..6ee3edd 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -64,12 +64,12 @@ public final class MySQLCommandExecuteEngine implements
CommandExecuteEngine {
public DatabasePacket<?> getErrorPacket(final Exception cause, final
BackendConnection backendConnection) {
return MySQLErrPacketFactory.newInstance(cause);
}
-
+
@Override
public DatabasePacket<?> getErrorPacket(final Exception cause) {
return MySQLErrPacketFactory.newInstance(cause);
}
-
+
@Override
public Optional<DatabasePacket<?>> getOtherPacket(final BackendConnection
backendConnection) {
return Optional.empty();
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
index b5175b6..846f044 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.proxy.frontend.mysql.err;
-import java.sql.SQLException;
-
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.db.protocol.error.CommonErrorCode;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerErrorCode;
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
@@ -36,7 +36,7 @@ import
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactio
import
org.apache.shardingsphere.proxy.backend.exception.UnknownDatabaseException;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.exception.CommonDistSQLErrorCode;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.exception.CommonDistSQLException;
-import
org.apache.shardingsphere.proxy.frontend.exception.FrontendConnectionLimitException;
+import
org.apache.shardingsphere.proxy.frontend.exception.FrontendTooManyConnectionsException;
import
org.apache.shardingsphere.proxy.frontend.exception.UnsupportedCommandException;
import
org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
import
org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
@@ -44,8 +44,7 @@ import
org.apache.shardingsphere.sharding.route.engine.exception.NoSuchTableExce
import
org.apache.shardingsphere.sharding.route.engine.exception.TableExistsException;
import org.apache.shardingsphere.sql.parser.exception.SQLParsingException;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import java.sql.SQLException;
/**
* ERR packet factory for MySQL.
@@ -118,12 +117,12 @@ public final class MySQLErrPacketFactory {
if (cause instanceof ScalingJobNotFoundException) {
return new MySQLErrPacket(1,
CommonErrorCode.SCALING_JOB_NOT_EXIST, ((ScalingJobNotFoundException)
cause).getJobId());
}
+ if (cause instanceof FrontendTooManyConnectionsException) {
+ return new MySQLErrPacket(1,
CommonErrorCode.TOO_MANY_CONNECTIONS_EXCEPTION,
CommonErrorCode.TOO_MANY_CONNECTIONS_EXCEPTION.getErrorMessage());
+ }
if (cause instanceof RuntimeException) {
return new MySQLErrPacket(1, CommonErrorCode.RUNTIME_EXCEPTION,
cause.getMessage());
}
- if (cause instanceof FrontendConnectionLimitException) {
- return new MySQLErrPacket(1,
CommonErrorCode.TOO_MANY_CONNECTIONS_EXCEPTION,
CommonErrorCode.TOO_MANY_CONNECTIONS_EXCEPTION.getErrorMessage());
- }
return new MySQLErrPacket(1, CommonErrorCode.UNKNOWN_EXCEPTION,
cause.getMessage());
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/OpenGaussCommandExecuteEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/OpenGaussCommandExecuteEngine.java
index 1609036..e497693 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/OpenGaussCommandExecuteEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/OpenGaussCommandExecuteEngine.java
@@ -65,12 +65,12 @@ public final class OpenGaussCommandExecuteEngine implements
CommandExecuteEngine
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId()).getPendingExecutors().clear();
return OpenGaussErrorPacketFactory.newInstance(cause);
}
-
+
@Override
public DatabasePacket<?> getErrorPacket(final Exception cause) {
return OpenGaussErrorPacketFactory.newInstance(cause);
}
-
+
@Override
public Optional<DatabasePacket<?>> getOtherPacket(final BackendConnection
backendConnection) {
return
postgreSQLCommandExecuteEngine.getOtherPacket(backendConnection);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index b660fe8..6b888af 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -81,12 +81,12 @@ public final class PostgreSQLCommandExecuteEngine
implements CommandExecuteEngin
}
return PostgreSQLErrPacketFactory.newInstance(cause);
}
-
+
@Override
public DatabasePacket<?> getErrorPacket(final Exception cause) {
return PostgreSQLErrPacketFactory.newInstance(cause);
}
-
+
@Override
public Optional<DatabasePacket<?>> getOtherPacket(final BackendConnection
backendConnection) {
PostgreSQLConnectionContext connectionContext =
PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
index 533e2c8..81d494a 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
@@ -72,7 +72,7 @@ public interface CommandExecuteEngine {
* @return error packet
*/
DatabasePacket<?> getErrorPacket(Exception cause, BackendConnection
backendConnection);
-
+
/**
* Get error packet.
*
@@ -80,7 +80,7 @@ public interface CommandExecuteEngine {
* @return error packet
*/
DatabasePacket<?> getErrorPacket(Exception cause);
-
+
/**
* Get other packet.
*