This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 36b45b570b0 [fix](mysql) fix mysql channel infinite blocking (#28808)
36b45b570b0 is described below
commit 36b45b570b0c16ac0a4cbe8f05e6a7ea0b946851
Author: fornaix <[email protected]>
AuthorDate: Fri Dec 29 13:57:22 2023 +0800
[fix](mysql) fix mysql channel infinite blocking (#28808)
Call the Channels blocking method with timeout instead.
Using session variables net_write_timeout and net_read_timeout as the
timeout parameter.
---
.../src/main/java/org/apache/doris/mysql/MysqlChannel.java | 14 ++++++++++----
.../src/main/java/org/apache/doris/qe/ConnectContext.java | 10 +++++++++-
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index 5eaee47fa4b..8e7c5f79ffd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -31,6 +31,7 @@ import org.xnio.channels.Channels;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
@@ -79,6 +80,8 @@ public class MysqlChannel {
// mysql flag CLIENT_DEPRECATE_EOF
private boolean clientDeprecatedEOF;
+ private ConnectContext context;
+
protected MysqlChannel() {
// For DummyMysqlChannel
}
@@ -91,7 +94,7 @@ public class MysqlChannel {
return clientDeprecatedEOF;
}
- public MysqlChannel(StreamConnection connection) {
+ public MysqlChannel(StreamConnection connection, ConnectContext context) {
Preconditions.checkNotNull(connection);
this.sequenceId = 0;
this.isSend = false;
@@ -113,6 +116,7 @@ public class MysqlChannel {
this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
+ this.context = context;
}
public void initSslBuffer() {
@@ -195,7 +199,8 @@ public class MysqlChannel {
}
try {
while (dstBuf.remaining() != 0) {
- int ret = Channels.readBlocking(conn.getSourceChannel(),
dstBuf);
+ int ret = Channels.readBlocking(conn.getSourceChannel(),
dstBuf, context.getNetReadTimeout(),
+ TimeUnit.SECONDS);
// return -1 when remote peer close the channel
if (ret == -1) {
decryptData(dstBuf, isHeader);
@@ -365,12 +370,13 @@ public class MysqlChannel {
protected void realNetSend(ByteBuffer buffer) throws IOException {
buffer = encryptData(buffer);
long bufLen = buffer.remaining();
- long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
+ long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer,
context.getNetWriteTimeout(),
+ TimeUnit.SECONDS);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" +
writeLen
+ ", needToWrite=" + bufLen + "]");
}
- Channels.flushBlocking(conn.getSinkChannel());
+ Channels.flushBlocking(conn.getSinkChannel(),
context.getNetWriteTimeout(), TimeUnit.SECONDS);
isSend = true;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index a3590a23c4b..8a1a16999b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -327,7 +327,7 @@ public class ConnectContext {
connectType = ConnectType.MYSQL;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
if (connection != null) {
- mysqlChannel = new MysqlChannel(connection);
+ mysqlChannel = new MysqlChannel(connection, this);
} else {
mysqlChannel = new DummyMysqlChannel();
}
@@ -1033,5 +1033,13 @@ public class ConnectContext {
public void setSkipAuth(boolean skipAuth) {
this.skipAuth = skipAuth;
}
+
+ public int getNetReadTimeout() {
+ return this.sessionVariable.getNetReadTimeout();
+ }
+
+ public int getNetWriteTimeout() {
+ return this.sessionVariable.getNetWriteTimeout();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]