Haoze Wu created ZOOKEEPER-4424:
-----------------------------------
Summary: Re-throwing IOException in
Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler#acceptConnections is not
always needed
Key: ZOOKEEPER-4424
URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4424
Project: ZooKeeper
Issue Type: Bug
Components: server
Affects Versions: 3.6.2
Reporter: Haoze Wu
When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new
socket connection, it may throw an IOException at line 510 or 514 or 515 or 517.
The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and
[https://github.com/apache/zookeeper/pull/1596] . It triggers a concurrency
bug. However, If the IOException occurs at line 514 or 515 or 517, actually we
can avoid this complicated process. We can simply catch the IOException and
proceed to accept the next socket connection, without exiting the
LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g.,
`setSoTimeout`) only indicates that the socket connection has some issues, but
the `ServerSocket` can still work well. Therefore, the
`LearnerCnxAcceptorHandler` can proceed to accept more socket connections with
this `ServerSocket`.
{code:java}
//zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
private void acceptConnections() throws IOException {
Socket socket = null;
boolean error = false;
try {
socket = serverSocket.accept(); // line 510
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
socket.setSoTimeout(self.tickTime * self.initLimit); //
line 514
socket.setTcpNoDelay(nodelay); // line 515
BufferedInputStream is = new
BufferedInputStream(socket.getInputStream()); // line 517
LearnerHandler fh = new LearnerHandler(socket, is,
Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop.get()) {
LOG.warn("Exception while shutting down acceptor.", e);
} else {
throw e;
}
} catch (SaslException e) {
LOG.error("Exception while connecting to quorum learner",
e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
LOG.warn("Error closing socket: " + socket, e);
}
}
}
}
{code}
We propose that the following implementation is better. The advantage is that
those IOException in the socket will not force the
`Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid
the overhead of re-election and potential concurrency bugs such as
ZOOKEEPER-4203.
{code:java}
private void acceptConnections() throws IOException {
Socket socket = null;
boolean error = false;
try {
socket = serverSocket.accept();
try {
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
socket.setSoTimeout(self.tickTime * self.initLimit);
socket.setTcpNoDelay(nodelay);
BufferedInputStream is = new
BufferedInputStream(socket.getInputStream());
} catch (IOException e) {
error = true;
return; // close the socket at the finally block
}
LearnerHandler fh = new LearnerHandler(socket, is,
Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop.get()) {
LOG.warn("Exception while shutting down acceptor.", e);
} else {
throw e;
}
} catch (SaslException e) {
LOG.error("Exception while connecting to quorum learner",
e);
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
LOG.warn("Error closing socket: " + socket, e);
}
}
}
}
{code}
This code pattern has been adopted by other communities. For example, in Kafka
https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740
:
{code:java}
/**
* Accept a new connection
*/
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(endPoint.listenerName,
socketChannel.socket.getInetAddress, blockedPercentMeter)
configureAcceptedSocketChannel(socketChannel)
Some(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info(...)
close(endPoint.listenerName, socketChannel)
None
case e: ConnectionThrottledException =>
// ...
None
case e: IOException =>
error(...)
close(endPoint.listenerName, socketChannel)
None
}
}
/**
* Close `channel` and decrement the connection count.
*/
def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
if (channel != null) {
// ...
closeSocket(channel)
}
}
protected def closeSocket(channel: SocketChannel): Unit = {
CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
CoreUtils.swallow(channel.close(), this, Level.ERROR)
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)