[ https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513673#comment-17513673 ]
Wenzhe Zhou commented on HADOOP-15720: -------------------------------------- Hi Hadoop team, does anyone is actively working on this issue? Any ETA? > rpcTimeout may not have been applied correctly > ---------------------------------------------- > > Key: HADOOP-15720 > URL: https://issues.apache.org/jira/browse/HADOOP-15720 > Project: Hadoop Common > Issue Type: Bug > Components: common > Reporter: Yongjun Zhang > Priority: Major > > org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously > via the same connection as in the following synchronized code block: > {code:java} > synchronized (sendRpcRequestLock) { > Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { > @Override > public void run() { > try { > synchronized (Connection.this.out) { > if (shouldCloseConnection.get()) { > return; > } > > if (LOG.isDebugEnabled()) { > LOG.debug(getName() + " sending #" + call.id > + " " + call.rpcRequest); > } > > byte[] data = d.getData(); > int totalLength = d.getLength(); > out.writeInt(totalLength); // Total Length > out.write(data, 0, totalLength);// RpcRequestHeader + > RpcRequest > out.flush(); > } > } catch (IOException e) { > // exception at this point would leave the connection in an > // unrecoverable state (eg half a call left on the wire). > // So, close the connection, killing any outstanding calls > markClosed(e); > } finally { > //the buffer is just an in-memory buffer, but it is still > polite to > // close early > IOUtils.closeStream(d); > } > } > }); > > try { > senderFuture.get(); > } catch (ExecutionException e) { > Throwable cause = e.getCause(); > > // cause should only be a RuntimeException as the Runnable above > // catches IOException > if (cause instanceof RuntimeException) { > throw (RuntimeException) cause; > } else { > throw new RuntimeException("unexpected checked exception", cause); > } > } > } > {code} > And it then waits for result asynchronously via > {code:java} > /* Receive a response. > * Because only one receiver, so no synchronization on in. > */ > private void receiveRpcResponse() { > if (shouldCloseConnection.get()) { > return; > } > touch(); > > try { > int totalLen = in.readInt(); > RpcResponseHeaderProto header = > RpcResponseHeaderProto.parseDelimitedFrom(in); > checkResponse(header); > int headerLen = header.getSerializedSize(); > headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); > int callId = header.getCallId(); > if (LOG.isDebugEnabled()) > LOG.debug(getName() + " got value #" + callId); > Call call = calls.get(callId); > RpcStatusProto status = header.getStatus(); > ...... > {code} > However, we can see that the {{call}} returned by {{receiveRpcResonse()}} > above may be in any order. > The following code > {code:java} > int totalLen = in.readInt(); > {code} > eventually calls one of the following two methods, where rpcTimeOut is > checked against: > {code:java} > /** Read a byte from the stream. > * Send a ping if timeout on read. Retries if no failure is detected > * until a byte is read. > * @throws IOException for any IO problem other than socket timeout > */ > @Override > public int read() throws IOException { > int waiting = 0; > do { > try { > return super.read(); > } catch (SocketTimeoutException e) { > waiting += soTimeout; > handleTimeout(e, waiting); > } > } while (true); > } > /** Read bytes into a buffer starting from offset <code>off</code> > * Send a ping if timeout on read. Retries if no failure is detected > * until a byte is read. > * > * @return the total number of bytes read; -1 if the connection is > closed. > */ > @Override > public int read(byte[] buf, int off, int len) throws IOException { > int waiting = 0; > do { > try { > return super.read(buf, off, len); > } catch (SocketTimeoutException e) { > waiting += soTimeout; > handleTimeout(e, waiting); > } > } while (true); > } > {code} > But the waiting time is always initialized to 0 for each of the above read > calls, so each call can take up to rpcTimeout. And the real time to time out > a call appears to be accumulative. > For example, if the client issue call1, call2, then it waits for result, if > the first call1 took (rpcTimeout - 1), thus no time out, the second took > (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1) > which could be much bigger than rpcTimeout and should time out. > Worst case is that a RPC may take indeterminatey long and doesn't time out. > It seems more accurate to remember the time that an RPC is sent to the > server, and then check time out here: > {code:java} > public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, > ConnectionId remoteId, int serviceClass, > AtomicBoolean fallbackToSimpleAuth) throws IOException { > final Call call = createCall(rpcKind, rpcRequest); > Connection connection = getConnection(remoteId, call, serviceClass, > fallbackToSimpleAuth); > try { > connection.sendRpcRequest(call); // send the rpc request > } catch (RejectedExecutionException e) { > throw new IOException("connection has been closed", e); > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > LOG.warn("interrupted waiting to send rpc request to server", e); > throw new IOException(e); > } > synchronized (call) { > while (!call.done) { > try { > call.wait(); // wait for the result > } catch (InterruptedException ie) { > Thread.currentThread().interrupt(); > throw new InterruptedIOException("Call interrupted"); > } <=should check how long it has waited here, time out if rpcTimeout > has been reached > } > if (call.error != null) { > if (call.error instanceof RemoteException) { > call.error.fillInStackTrace(); > throw call.error; > } else { // local exception > InetSocketAddress address = connection.getRemoteAddress(); > throw NetUtils.wrapException(address.getHostName(), > address.getPort(), > NetUtils.getHostname(), > 0, > call.error); > } > } else { > return call.getRpcResponse(); > } > } > } > {code} > basically we should change the call highlighted above from > {code:java} > public final void wait() throws InterruptedException > {code} > to > {code:java} > public final void wait(long timeout, int nanos) throws InterruptedException > {code} > and apply rpcTimeout as the parameter value here (notice that I'm ignoring > the time needed to send rpc over to the server, and ideally we should include > that too, so rpcTimeout could mean what it intends to mean). > Hi [~daryn] and [~kihwal], would you please help taking a look at my above > analysis to see if I have any misunderstanding here? > Thanks a lot. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org