Haoze Wu created HADOOP-17552:
---------------------------------

             Summary: The read method of 
hadoop.ipc.Client$Connection$PingInputStream may swallow 
java.net.SocketTimeoutException due to the mistaken usage of the rpcTimeout 
configuration
                 Key: HADOOP-17552
                 URL: https://issues.apache.org/jira/browse/HADOOP-17552
             Project: Hadoop Common
          Issue Type: Bug
          Components: ipc
    Affects Versions: 3.2.2
            Reporter: Haoze Wu


We are doing some systematic fault injection testing in Hadoop-3.2.2 and when 
we try to run a client (e.g., `bin/hdfs dfs -ls /`) to our HDFS cluster (1 
NameNode, 2 DataNodes), the client gets stuck forever. After some 
investigation, we believe that it’s a bug in `hadoop.ipc.Client` because the 
read method of `hadoop.ipc.Client$Connection$PingInputStream` keeps swallowing 
`java.net.SocketTimeoutException` due to the mistaken usage of the `rpcTimeout` 
configuration in the `handleTimeout` method.

*Reproduction*

Start HDFS with the default configuration. Then execute a client (we used the 
command `bin/hdfs dfs -ls /` in the terminal). While HDFS is trying to accept 
the client’s socket, inject a socket error (java.net.SocketException or 
java.io.IOException), specifically at line 1402 (line 1403 or 1404 will also 
work).

We prepare the scripts for reproduction in a gist...

*Diagnosis*

When the NameNode tries to accept a client’s socket, basically there are 4 
steps:
 # accept the socket (line 1400)
 # configure the socket (line 1402-1404)
 # make the socket a Reader (after line 1404)
 # swallow the possible IOException in line 1350

 
{code:java}
//hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

    public void run() {
      while (running) {
        SelectionKey key = null;
        try {
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {                         // line 1350
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // ...
        } catch (Exception e) {
          // ...
        }
      }
    }

    void doAccept(SelectionKey key) throws InterruptedException, IOException, 
        OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {           // line 1400

        channel.configureBlocking(false);                     // line 1402
        channel.socket().setTcpNoDelay(tcpNoDelay);           // line 1403
        channel.socket().setKeepAlive(true);                  // line 1404
        
        Reader reader = getReader();
        Connection c = connectionManager.register(channel,
            this.listenPort, this.isOnAuxiliaryPort);
        // If the connectionManager can't take it, close the connection.
        if (c == null) {
          if (channel.isOpen()) {
            IOUtils.cleanup(null, channel);
          }
          connectionManager.droppedConnections.getAndIncrement();
          continue;
        }
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
      }
    }
{code}
When a SocketException occurs in line 1402 (or 1403 or 1404), the 
server.accept() in line 1400 has finished, so we expect the following behavior:

 
 # The server (NameNode) accepts this connection but it will basically write 
nothing to this connection because it’s not added as a Reader data structure.
 # The client is aware that the connection has been established, and tries to 
read and write in this connection. After some time threshold, the client finds 
that it can’t read anything from this connection and exits with some exception 
or error.

However, we do not observe 2. The client just gets stuck forever (>10min). We 
re-examine the default configuration in 
[https://hadoop.apache.org/docs/r3.2.2/hadoop-project-dist/hadoop-common/core-default.xml]
 and we believe that the client should be able to time out in 60 seconds, 
according to the parameter `ipc.client.rpc-timeout.ms`, `ipc.client.ping` and 
`ipc.ping.interval`.

We find where the client gets stuck from the jstack dump:

 
{code:java}
"main" #1 prio=5 os_prio=0 tid=0x00007f5554019800 nid=0x4312 in Object.wait() 
[0x00007f555d62e000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000071cc19ff0> (a org.apache.hadoop.ipc.Client$Call)
        at java.lang.Object.wait(Object.java:502)
        at 
org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59)
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1551)
        - locked <0x000000071cc19ff0> (a org.apache.hadoop.ipc.Client$Call)
        at org.apache.hadoop.ipc.Client.call(Client.java:1508)
        at org.apache.hadoop.ipc.Client.call(Client.java:1405)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
        at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
        at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:910)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        - locked <0x000000071cb435b8> (a 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1671)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1602)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1599)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1614)
        at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:65)
        at org.apache.hadoop.fs.Globber.doGlob(Globber.java:294)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2050)
        at org.apache.hadoop.fs.shell.PathData.expandAsGlob(PathData.java:353)
        at org.apache.hadoop.fs.shell.Command.expandArgument(Command.java:250)
        at org.apache.hadoop.fs.shell.Command.expandArguments(Command.java:233)
        at 
org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:104)
        at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
        at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
        at org.apache.hadoop.fs.FsShell.main(FsShell.java:390)
{code}
According to org.apache.hadoop.ipc.Client.call(Client.java:1508), the runtime 
value of timeout in 
org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1551) is -1, meaning 
that it waits forever. The only way of notifying it is within the callComplete 
method of Client$Call (Client.java:367). To invoke callComplete, there are only 
2 methods in Client$Call: setException and setRpcResponse.

Our expectation is that after the timeout in the client happens, the 
setException will be finally invoked. By some analysis, we will explain the 
workflow of dealing with this timeout issue and point out where the bug is.

The setException method should be invoked by the Client$Connection thread:

 

 
{code:java}
    @Override
    public void run() {
      // ...
      try {
        while (waitForWork()) {                       // line 1086
          receiveRpcResponse();                       // line 1087
        }
      } catch (Throwable t) {
        // ...
        markClosed(new IOException("Error reading responses", t));
      }
      close();                                        // line 1097
      // ...
    }    private void receiveRpcResponse() {
      // ...
      try {
        ByteBuffer bb = ipcStreams.readResponse();    // line 1191
        // ...
      } catch (IOException e) {
        markClosed(e);                                // line 1235
      }
    }
{code}
When the timeout happens, the correct workflow is:

 
 # Before the I/O, the run method invokes receiveRpcResponse (line 1087) and 
then invokes readResponse (line 1191)
 # After timeout, the readResponse invocation (line 1191) throws 
java.net.SocketTimeoutException
 # This exception is caught by markClosed (line 1235) and handled
 # The waitForWork (line 1086) returns false due to markClosed’s handling
 # The close method (line 1097) gets run, and finally invokes the setException 
method, which will unlock the org.apache.hadoop.fs.FsShell.main thread that we 
currently get stuck in.

The bug is within step 2. We confirmed that this Client$Connection thread is 
running the readResponse invocation (line 1191) forever, without throwing any 
exception. We have the jstack dump of this Client$Connection thread:

 
{code:java}
"IPC Client (1390869998) connection to /127.0.0.1:9000 from whz" #16 daemon 
prio=5 os_prio=0 tid=0x00007f555551e000 nid=0x432b run
nable [0x00007f5524126000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x000000071ce57d28> (a sun.nio.ch.Util$3)
        - locked <0x000000071ce57ca0> (a java.util.Collections$UnmodifiableSet)
        - locked <0x000000071ce578d8> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at java.io.FilterInputStream.read(FilterInputStream.java:133)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        - locked <0x000000071ce85708> (a java.io.BufferedInputStream)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at 
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:562)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at 
org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1881)
        at 
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1191)
        at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1087)
{code}
By investigating each level of this stack trace, we confirmed that there should 
be a java.net.SocketTimeoutException thrown by 
org.apache.hadoop.net.SocketIOWithTimeout.doIO 
([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java#L164]),
 but this exception is swallowed by 
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read:

 

 
{code:java}
      public int read() throws IOException {
        int waiting = 0;
        do {
          try {
            return super.read();                // appear in stack trace
          } catch (SocketTimeoutException e) {
            waiting += soTimeout;
            handleTimeout(e, waiting);          // line 565
          }
        } while (true);                         // line 567
      }
{code}
The handleTimeout invocation (line 565) should throw this 
SocketTimeoutException again. Otherwise the infinite loop indicated by line 567 
will run forever.

 

However, the handleTimeout method fails to enter the branch of throwing 
exception:
{code:java}
     /* Process timeout exception
       * if the connection is not going to be closed or 
       * the RPC is not timed out yet, send a ping.
       */
      private void handleTimeout(SocketTimeoutException e, int waiting)
          throws IOException {
        if (shouldCloseConnection.get() || !running.get() ||
            (0 < rpcTimeout && rpcTimeout <= waiting)) {            // line 545
          throw e;
        } else {
          sendPing();                                               // line 548
        }
      }
{code}
It goes to line 548 and sends PING. PING can be sent successfully because the 
connection is currently not broken. However, in the correct behavior, line 545 
is evaluated to be true and the exception can be thrown again.

Line 545 is evaluated to be false, because the rpcTimeout variable (default 
value: 0) (`ipc.client.rpc-timeout.ms` in the default configuration 
[https://hadoop.apache.org/docs/r3.2.2/hadoop-project-dist/hadoop-common/core-default.xml]
 ) seems to be used in an incorrect way in this function (handleTimeout). 
According to the explanation of `ipc.client.rpc-timeout.ms` in the default 
configuration 
[https://hadoop.apache.org/docs/r3.2.2/hadoop-project-dist/hadoop-common/core-default.xml],
 “If ipc.client.ping is set to true and this rpc-timeout is greater than the 
value of ipc.ping.interval, the effective value of the rpc-timeout is rounded 
up to multiple of ipc.ping.interval.”

The rpcTimeout is used correctly in the constructor of Client$Connection class:
{code:java}
    Connection(ConnectionId remoteId, int serviceClass,
        Consumer<Connection> removeMethod) {
      // ...
      if (rpcTimeout > 0) {
        // effective rpc timeout is rounded up to multiple of pingInterval
        // if pingInterval < rpcTimeout.
        this.soTimeout = (doPing && pingInterval < rpcTimeout) ?
            pingInterval : rpcTimeout;
      } else {
        this.soTimeout = pingInterval;
      }
      // ...
    }
{code}
We have confirmed that, in the handleException method, if we use this.soTimeout 
variable prepared by this constructor, then this bug is fixed.

*Fix*

We propose that we should modify the line 545 of the handleException method of 
the Client$Connection class. The value of rpcTimeout should be used in the way 
that the constructor of Client$Connection deals with it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-dev-h...@hadoop.apache.org

Reply via email to