[ https://issues.apache.org/jira/browse/HADOOP-18024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ayush Saxena reopened HADOOP-18024: ----------------------------------- > SocketChannel is not closed when IOException happens in > Server$Listener.doAccept > -------------------------------------------------------------------------------- > > Key: HADOOP-18024 > URL: https://issues.apache.org/jira/browse/HADOOP-18024 > Project: Hadoop Common > Issue Type: Bug > Components: ipc > Affects Versions: 3.2.2 > Reporter: Haoze Wu > Assignee: Haoze Wu > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > This is a follow-up of HADOOP-17552. > When the symptom described in HADOOP-17552 happens, the client may time out > in 2min, according to the default RPC timeout configuration specified in > HADOOP-17552. Before this timeout, the client just waits, and does not know > this issue happens. > However, we recently found that actually the client doesn’t need to waste > this 2min, and the server’s availability can be also improved. If the > IOException happens in line 1402 or 1403 or 1404, we can just close this > problematic `SocketChannel` and continue to accept new socket connections. > The client side can also be aware of the close socket immediately, instead of > waiting 2min. > The old implementation: > {code:java} > //hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java > public void run() { > while (running) { > // ... > try { > // ... > while (iter.hasNext()) { > // ... > try { > if (key.isValid()) { > if (key.isAcceptable()) > doAccept(key); // line 1348 > } > } catch (IOException e) { // line 1350 > } > // ... > } > } catch (OutOfMemoryError e) { > // ... > } catch (Exception e) { > // ... > } > } > } {code} > {code:java} > //hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java > void doAccept(SelectionKey key) throws InterruptedException, IOException, > OutOfMemoryError { > ServerSocketChannel server = (ServerSocketChannel) key.channel(); > SocketChannel channel; > while ((channel = server.accept()) != null) { // line 1400 > channel.configureBlocking(false); // line 1402 > channel.socket().setTcpNoDelay(tcpNoDelay); // line 1403 > channel.socket().setKeepAlive(true); // line 1404 > Reader reader = getReader(); > Connection c = connectionManager.register(channel, > this.listenPort, this.isOnAuxiliaryPort); > // If the connectionManager can't take it, close the connection. > if (c == null) { > if (channel.isOpen()) { > IOUtils.cleanup(null, channel); > } > connectionManager.droppedConnections.getAndIncrement(); > continue; > } > key.attach(c); // so closeCurrentConnection can get the object > reader.addConnection(c); > } > } {code} > > We propose that the following implementation is better: > {code:java} > void doAccept(SelectionKey key) throws InterruptedException, IOException, > OutOfMemoryError { > ServerSocketChannel server = (ServerSocketChannel) key.channel(); > SocketChannel channel; > while ((channel = server.accept()) != null) { // line 1400 > try { > channel.configureBlocking(false); // line 1402 > channel.socket().setTcpNoDelay(tcpNoDelay); // line 1403 > channel.socket().setKeepAlive(true); // line 1404 > } catch (IOException e) { > LOG.warn(...); > try { > channel.socket().close(); > channel.close(); > } catch (IOException ignored) { } > continue; > } > // ... > } > }{code} > The advantages include: > # {*}In the old implementation{*}, the `ServerSocketChannel` was abandoned > due to the single exception in this single `SocketChannel`, because the > exception handler is in line 1350. {*}In the new implementation{*}, we use a > try-catch to handle the exception in line 1402 or 1403 or 1404, then the > `ServerSocketChannel` can continue to accept new connections, and don’t need > to go back to the line 1348 in the next while loop in the run method. > # {*}In the old implementation{*}, the client (another endpoint of this > `SocketChannel`) is not aware of this issue, because the `SocketChannel` is > accepted and not closed. {*}In the new implementation{*}, we close the > `SocketChannel` when the IOException happens, then the client will > immediately get EOF from the socket. Then the client can choose to retry or > throw an exception, by the client’s discretion. > > We have confirmed that this patch works as expected, in our local machine. > > This code pattern was adopted by other communities. For example, in Kafka > [https://github.com/apache/kafka/blob/23e9818e625976c22fe6d4297a5ab76b01f92ef6/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]: > {code:java} > /** > * Accept a new connection > */ > private def accept(key: SelectionKey): Option[SocketChannel] = { > val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] > val socketChannel = serverSocketChannel.accept() > try { > connectionQuotas.inc(endPoint.listenerName, > socketChannel.socket.getInetAddress, blockedPercentMeter) > configureAcceptedSocketChannel(socketChannel) > Some(socketChannel) > } catch { > case e: TooManyConnectionsException => > info(...) > close(endPoint.listenerName, socketChannel) > None > case e: ConnectionThrottledException => > // ... > None > case e: IOException => > error(...) > close(endPoint.listenerName, socketChannel) > None > } > } > /** > * Close `channel` and decrement the connection count. > */ > def close(listenerName: ListenerName, channel: SocketChannel): Unit = { > if (channel != null) { > // ... > closeSocket(channel) > } > } > protected def closeSocket(channel: SocketChannel): Unit = { > CoreUtils.swallow(channel.socket().close(), this, Level.ERROR) > CoreUtils.swallow(channel.close(), this, Level.ERROR) > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-dev-h...@hadoop.apache.org