[ 
https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513673#comment-17513673
 ] 

Wenzhe Zhou commented on HADOOP-15720:
--------------------------------------

Hi Hadoop team, does anyone is actively working on this issue? Any ETA?  

> rpcTimeout may not have been applied correctly
> ----------------------------------------------
>
>                 Key: HADOOP-15720
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15720
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>            Reporter: Yongjun Zhang
>            Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously 
> via the same connection as in the following synchronized code block:
> {code:java}
>       synchronized (sendRpcRequestLock) {
>         Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
>           @Override
>           public void run() {
>             try {
>               synchronized (Connection.this.out) {
>                 if (shouldCloseConnection.get()) {
>                   return;
>                 }
>                 
>                 if (LOG.isDebugEnabled()) {
>                   LOG.debug(getName() + " sending #" + call.id
>                       + " " + call.rpcRequest);
>                 }
>          
>                 byte[] data = d.getData();
>                 int totalLength = d.getLength();
>                 out.writeInt(totalLength); // Total Length
>                 out.write(data, 0, totalLength);// RpcRequestHeader + 
> RpcRequest
>                 out.flush();
>               }
>             } catch (IOException e) {
>               // exception at this point would leave the connection in an
>               // unrecoverable state (eg half a call left on the wire).
>               // So, close the connection, killing any outstanding calls
>               markClosed(e);
>             } finally {
>               //the buffer is just an in-memory buffer, but it is still 
> polite to
>               // close early
>               IOUtils.closeStream(d);
>             }
>           }
>         });
>       
>         try {
>           senderFuture.get();
>         } catch (ExecutionException e) {
>           Throwable cause = e.getCause();
>           
>           // cause should only be a RuntimeException as the Runnable above
>           // catches IOException
>           if (cause instanceof RuntimeException) {
>             throw (RuntimeException) cause;
>           } else {
>             throw new RuntimeException("unexpected checked exception", cause);
>           }
>         }
>       }
> {code}
> And it then waits for result asynchronously via
> {code:java}
>     /* Receive a response.
>      * Because only one receiver, so no synchronization on in.
>      */
>     private void receiveRpcResponse() {
>       if (shouldCloseConnection.get()) {
>         return;
>       }
>       touch();
>       
>       try {
>         int totalLen = in.readInt();
>         RpcResponseHeaderProto header = 
>             RpcResponseHeaderProto.parseDelimitedFrom(in);
>         checkResponse(header);
>         int headerLen = header.getSerializedSize();
>         headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
>         int callId = header.getCallId();
>         if (LOG.isDebugEnabled())
>           LOG.debug(getName() + " got value #" + callId);
>         Call call = calls.get(callId);
>         RpcStatusProto status = header.getStatus();
> ......
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}} 
> above may be in any order.
> The following code
> {code:java}
>         int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is 
> checked against:
> {code:java}
>       /** Read a byte from the stream.
>        * Send a ping if timeout on read. Retries if no failure is detected
>        * until a byte is read.
>        * @throws IOException for any IO problem other than socket timeout
>        */
>       @Override
>       public int read() throws IOException {
>         int waiting = 0;
>         do {
>           try {
>             return super.read();
>           } catch (SocketTimeoutException e) {
>             waiting += soTimeout;
>             handleTimeout(e, waiting);
>           }
>         } while (true);
>       }
>       /** Read bytes into a buffer starting from offset <code>off</code>
>        * Send a ping if timeout on read. Retries if no failure is detected
>        * until a byte is read.
>        * 
>        * @return the total number of bytes read; -1 if the connection is 
> closed.
>        */
>       @Override
>       public int read(byte[] buf, int off, int len) throws IOException {
>         int waiting = 0;
>         do {
>           try {
>             return super.read(buf, off, len);
>           } catch (SocketTimeoutException e) {
>             waiting += soTimeout;
>             handleTimeout(e, waiting);
>           }
>         } while (true);
>       }
> {code}
> But the waiting time is always initialized to 0 for each of the above read 
> calls, so each call can take up to rpcTimeout. And the real time to time out 
> a call appears to be accumulative.
> For example, if the client issue call1, call2, then it waits for result, if 
> the first call1 took (rpcTimeout - 1), thus no time out, the second took 
> (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1) 
> which could be much bigger than rpcTimeout and should time out.
> Worst case is that a RPC may take indeterminatey long and doesn't time out.
> It seems more accurate to remember the time that an RPC is sent to the 
> server, and then check time out here:
> {code:java}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId, int serviceClass,
>       AtomicBoolean fallbackToSimpleAuth) throws IOException {
>     final Call call = createCall(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call, serviceClass,
>       fallbackToSimpleAuth);
>     try {
>       connection.sendRpcRequest(call);                 // send the rpc request
>     } catch (RejectedExecutionException e) {
>       throw new IOException("connection has been closed", e);
>     } catch (InterruptedException e) {
>       Thread.currentThread().interrupt();
>       LOG.warn("interrupted waiting to send rpc request to server", e);
>       throw new IOException(e);
>     }
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           Thread.currentThread().interrupt();
>           throw new InterruptedIOException("Call interrupted");
>         } <=should check how long it has waited here, time out if rpcTimeout 
> has been reached
>       }  
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResponse();
>       }
>     }
>   }
> {code}
> basically we should change the call highlighted above from
> {code:java}
>     public final void wait() throws InterruptedException
> {code}
> to
> {code:java}
> public final void wait(long timeout, int nanos) throws InterruptedException
> {code}
> and apply rpcTimeout as the parameter value here (notice that I'm ignoring 
> the time needed to send rpc over to the server, and ideally we should include 
> that too, so rpcTimeout could mean what it intends to mean).
> Hi [~daryn] and [~kihwal], would you please help taking a look at my above 
> analysis to see if I have any misunderstanding here?
> Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to