[jira] [Commented] (HADOOP-15720) rpcTimeout may not have been applied correctly

2022-03-28 Thread Wenzhe Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17513673#comment-17513673
 ] 

Wenzhe Zhou commented on HADOOP-15720:
--

Hi Hadoop team, does anyone is actively working on this issue? Any ETA?  

> rpcTimeout may not have been applied correctly
> --
>
> Key: HADOOP-15720
> URL: https://issues.apache.org/jira/browse/HADOOP-15720
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: common
>Reporter: Yongjun Zhang
>Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously 
> via the same connection as in the following synchronized code block:
> {code:java}
>   synchronized (sendRpcRequestLock) {
> Future senderFuture = sendParamsExecutor.submit(new Runnable() {
>   @Override
>   public void run() {
> try {
>   synchronized (Connection.this.out) {
> if (shouldCloseConnection.get()) {
>   return;
> }
> 
> if (LOG.isDebugEnabled()) {
>   LOG.debug(getName() + " sending #" + call.id
>   + " " + call.rpcRequest);
> }
>  
> byte[] data = d.getData();
> int totalLength = d.getLength();
> out.writeInt(totalLength); // Total Length
> out.write(data, 0, totalLength);// RpcRequestHeader + 
> RpcRequest
> out.flush();
>   }
> } catch (IOException e) {
>   // exception at this point would leave the connection in an
>   // unrecoverable state (eg half a call left on the wire).
>   // So, close the connection, killing any outstanding calls
>   markClosed(e);
> } finally {
>   //the buffer is just an in-memory buffer, but it is still 
> polite to
>   // close early
>   IOUtils.closeStream(d);
> }
>   }
> });
>   
> try {
>   senderFuture.get();
> } catch (ExecutionException e) {
>   Throwable cause = e.getCause();
>   
>   // cause should only be a RuntimeException as the Runnable above
>   // catches IOException
>   if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
>   } else {
> throw new RuntimeException("unexpected checked exception", cause);
>   }
> }
>   }
> {code}
> And it then waits for result asynchronously via
> {code:java}
> /* Receive a response.
>  * Because only one receiver, so no synchronization on in.
>  */
> private void receiveRpcResponse() {
>   if (shouldCloseConnection.get()) {
> return;
>   }
>   touch();
>   
>   try {
> int totalLen = in.readInt();
> RpcResponseHeaderProto header = 
> RpcResponseHeaderProto.parseDelimitedFrom(in);
> checkResponse(header);
> int headerLen = header.getSerializedSize();
> headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
> int callId = header.getCallId();
> if (LOG.isDebugEnabled())
>   LOG.debug(getName() + " got value #" + callId);
> Call call = calls.get(callId);
> RpcStatusProto status = header.getStatus();
> ..
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}} 
> above may be in any order.
> The following code
> {code:java}
> int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is 
> checked against:
> {code:java}
>   /** Read a byte from the stream.
>* Send a ping if timeout on read. Retries if no failure is detected
>* until a byte is read.
>* @throws IOException for any IO problem other than socket timeout
>*/
>   @Override
>   public int read() throws IOException {
> int waiting = 0;
> do {
>   try {
> return super.read();
>   } catch (SocketTimeoutException e) {
> waiting += soTimeout;
> handleTimeout(e, waiting);
>   }
> } while (true);
>   }
>   /** Read bytes into a buffer starting from offset off
>* Send a ping if timeout on read. Retries if no failure is detected
>* until a byte is read.
>* 
>* @return the total number of bytes read; -1 if the connection is 
> closed.
>*/
>   @Override
>   public int read(byte[] buf, int off, int len) throws IOException {
> int waiting = 0;
> do {
>   try {
> return super.read(buf, off, len);
>   } catch 

[jira] [Commented] (HADOOP-15720) rpcTimeout may not have been applied correctly

2019-03-04 Thread Tsz Wo Nicholas Sze (JIRA)


[ 
https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783807#comment-16783807
 ] 

Tsz Wo Nicholas Sze commented on HADOOP-15720:
--

Hi [~yzhangal], if the bug is no rpc timeout, it should be very easy to test 
the bug.  Why don't we illustrate it in a unit test?

> rpcTimeout may not have been applied correctly
> --
>
> Key: HADOOP-15720
> URL: https://issues.apache.org/jira/browse/HADOOP-15720
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: common
>Reporter: Yongjun Zhang
>Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously 
> via the same connection as in the following synchronized code block:
> {code:java}
>   synchronized (sendRpcRequestLock) {
> Future senderFuture = sendParamsExecutor.submit(new Runnable() {
>   @Override
>   public void run() {
> try {
>   synchronized (Connection.this.out) {
> if (shouldCloseConnection.get()) {
>   return;
> }
> 
> if (LOG.isDebugEnabled()) {
>   LOG.debug(getName() + " sending #" + call.id
>   + " " + call.rpcRequest);
> }
>  
> byte[] data = d.getData();
> int totalLength = d.getLength();
> out.writeInt(totalLength); // Total Length
> out.write(data, 0, totalLength);// RpcRequestHeader + 
> RpcRequest
> out.flush();
>   }
> } catch (IOException e) {
>   // exception at this point would leave the connection in an
>   // unrecoverable state (eg half a call left on the wire).
>   // So, close the connection, killing any outstanding calls
>   markClosed(e);
> } finally {
>   //the buffer is just an in-memory buffer, but it is still 
> polite to
>   // close early
>   IOUtils.closeStream(d);
> }
>   }
> });
>   
> try {
>   senderFuture.get();
> } catch (ExecutionException e) {
>   Throwable cause = e.getCause();
>   
>   // cause should only be a RuntimeException as the Runnable above
>   // catches IOException
>   if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
>   } else {
> throw new RuntimeException("unexpected checked exception", cause);
>   }
> }
>   }
> {code}
> And it then waits for result asynchronously via
> {code:java}
> /* Receive a response.
>  * Because only one receiver, so no synchronization on in.
>  */
> private void receiveRpcResponse() {
>   if (shouldCloseConnection.get()) {
> return;
>   }
>   touch();
>   
>   try {
> int totalLen = in.readInt();
> RpcResponseHeaderProto header = 
> RpcResponseHeaderProto.parseDelimitedFrom(in);
> checkResponse(header);
> int headerLen = header.getSerializedSize();
> headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
> int callId = header.getCallId();
> if (LOG.isDebugEnabled())
>   LOG.debug(getName() + " got value #" + callId);
> Call call = calls.get(callId);
> RpcStatusProto status = header.getStatus();
> ..
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}} 
> above may be in any order.
> The following code
> {code:java}
> int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is 
> checked against:
> {code:java}
>   /** Read a byte from the stream.
>* Send a ping if timeout on read. Retries if no failure is detected
>* until a byte is read.
>* @throws IOException for any IO problem other than socket timeout
>*/
>   @Override
>   public int read() throws IOException {
> int waiting = 0;
> do {
>   try {
> return super.read();
>   } catch (SocketTimeoutException e) {
> waiting += soTimeout;
> handleTimeout(e, waiting);
>   }
> } while (true);
>   }
>   /** Read bytes into a buffer starting from offset off
>* Send a ping if timeout on read. Retries if no failure is detected
>* until a byte is read.
>* 
>* @return the total number of bytes read; -1 if the connection is 
> closed.
>*/
>   @Override
>   public int read(byte[] buf, int off, int len) throws IOException {
> int waiting = 0;
> do {
>   try 

[jira] [Commented] (HADOOP-15720) rpcTimeout may not have been applied correctly

2019-02-26 Thread Yongjun Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16778657#comment-16778657
 ] 

Yongjun Zhang commented on HADOOP-15720:


The latest RPC code is changed (by HADOOP-12909 and related) to support 
asynchronous mode in additional to the original synchronous mode.

The synchoronous mode RPC behavior seems to have changed, and the new code 
appears to not do RpcTimeOut in synchronous mode (why?), see:

[https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java#L1490]

while it does rpcTimeOut in asynchronous mode, see:

[https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java#L1465]

The jira I reported was on top of the old synchronous mode implementation (in 
which case the sending of RPCs is serialized, but the responses are received 
asynchronously).

 Thanks.

> rpcTimeout may not have been applied correctly
> --
>
> Key: HADOOP-15720
> URL: https://issues.apache.org/jira/browse/HADOOP-15720
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: common
>Reporter: Yongjun Zhang
>Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously 
> via the same connection as in the following synchronized code block:
> {code:java}
>   synchronized (sendRpcRequestLock) {
> Future senderFuture = sendParamsExecutor.submit(new Runnable() {
>   @Override
>   public void run() {
> try {
>   synchronized (Connection.this.out) {
> if (shouldCloseConnection.get()) {
>   return;
> }
> 
> if (LOG.isDebugEnabled()) {
>   LOG.debug(getName() + " sending #" + call.id
>   + " " + call.rpcRequest);
> }
>  
> byte[] data = d.getData();
> int totalLength = d.getLength();
> out.writeInt(totalLength); // Total Length
> out.write(data, 0, totalLength);// RpcRequestHeader + 
> RpcRequest
> out.flush();
>   }
> } catch (IOException e) {
>   // exception at this point would leave the connection in an
>   // unrecoverable state (eg half a call left on the wire).
>   // So, close the connection, killing any outstanding calls
>   markClosed(e);
> } finally {
>   //the buffer is just an in-memory buffer, but it is still 
> polite to
>   // close early
>   IOUtils.closeStream(d);
> }
>   }
> });
>   
> try {
>   senderFuture.get();
> } catch (ExecutionException e) {
>   Throwable cause = e.getCause();
>   
>   // cause should only be a RuntimeException as the Runnable above
>   // catches IOException
>   if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
>   } else {
> throw new RuntimeException("unexpected checked exception", cause);
>   }
> }
>   }
> {code}
> And it then waits for result asynchronously via
> {code:java}
> /* Receive a response.
>  * Because only one receiver, so no synchronization on in.
>  */
> private void receiveRpcResponse() {
>   if (shouldCloseConnection.get()) {
> return;
>   }
>   touch();
>   
>   try {
> int totalLen = in.readInt();
> RpcResponseHeaderProto header = 
> RpcResponseHeaderProto.parseDelimitedFrom(in);
> checkResponse(header);
> int headerLen = header.getSerializedSize();
> headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
> int callId = header.getCallId();
> if (LOG.isDebugEnabled())
>   LOG.debug(getName() + " got value #" + callId);
> Call call = calls.get(callId);
> RpcStatusProto status = header.getStatus();
> ..
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}} 
> above may be in any order.
> The following code
> {code:java}
> int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is 
> checked against:
> {code:java}
>   /** Read a byte from the stream.
>* Send a ping if timeout on read. Retries if no failure is detected
>* until a byte is read.
>* @throws IOException for any IO problem other than socket timeout
>*/
>   @Override
>   public int read() throws IOException {
> int waiting = 0;
> do {
>   try 

[jira] [Commented] (HADOOP-15720) rpcTimeout may not have been applied correctly

2019-02-21 Thread Yongjun Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16774526#comment-16774526
 ] 

Yongjun Zhang commented on HADOOP-15720:


Hi [~arpaga], [~szetszwo], [~jojochuang],

Wonder if you guys could help taking a look at my analysis? If agreed upon, we 
can push forward on the solution. 

In summary, if multiple RPC calls (R1, R2, R3, ..., Rn) were sent to NN by the 
same client synchronusly, and the client then wait for RPC responses 
asynchronously.  Since the response of R1, R2, .. Rn can be received in any 
order, the RpcTimeOut on each of them is not really enforced due to the logic I 
described. I was trying to propose some change to enforce RpcTimeOut on 
individual RPC calls.

Thanks.

> rpcTimeout may not have been applied correctly
> --
>
> Key: HADOOP-15720
> URL: https://issues.apache.org/jira/browse/HADOOP-15720
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: common
>Reporter: Yongjun Zhang
>Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously 
> via the same connection as in the following synchronized code block:
> {code:java}
>   synchronized (sendRpcRequestLock) {
> Future senderFuture = sendParamsExecutor.submit(new Runnable() {
>   @Override
>   public void run() {
> try {
>   synchronized (Connection.this.out) {
> if (shouldCloseConnection.get()) {
>   return;
> }
> 
> if (LOG.isDebugEnabled()) {
>   LOG.debug(getName() + " sending #" + call.id
>   + " " + call.rpcRequest);
> }
>  
> byte[] data = d.getData();
> int totalLength = d.getLength();
> out.writeInt(totalLength); // Total Length
> out.write(data, 0, totalLength);// RpcRequestHeader + 
> RpcRequest
> out.flush();
>   }
> } catch (IOException e) {
>   // exception at this point would leave the connection in an
>   // unrecoverable state (eg half a call left on the wire).
>   // So, close the connection, killing any outstanding calls
>   markClosed(e);
> } finally {
>   //the buffer is just an in-memory buffer, but it is still 
> polite to
>   // close early
>   IOUtils.closeStream(d);
> }
>   }
> });
>   
> try {
>   senderFuture.get();
> } catch (ExecutionException e) {
>   Throwable cause = e.getCause();
>   
>   // cause should only be a RuntimeException as the Runnable above
>   // catches IOException
>   if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
>   } else {
> throw new RuntimeException("unexpected checked exception", cause);
>   }
> }
>   }
> {code}
> And it then waits for result asynchronously via
> {code:java}
> /* Receive a response.
>  * Because only one receiver, so no synchronization on in.
>  */
> private void receiveRpcResponse() {
>   if (shouldCloseConnection.get()) {
> return;
>   }
>   touch();
>   
>   try {
> int totalLen = in.readInt();
> RpcResponseHeaderProto header = 
> RpcResponseHeaderProto.parseDelimitedFrom(in);
> checkResponse(header);
> int headerLen = header.getSerializedSize();
> headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
> int callId = header.getCallId();
> if (LOG.isDebugEnabled())
>   LOG.debug(getName() + " got value #" + callId);
> Call call = calls.get(callId);
> RpcStatusProto status = header.getStatus();
> ..
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}} 
> above may be in any order.
> The following code
> {code:java}
> int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is 
> checked against:
> {code:java}
>   /** Read a byte from the stream.
>* Send a ping if timeout on read. Retries if no failure is detected
>* until a byte is read.
>* @throws IOException for any IO problem other than socket timeout
>*/
>   @Override
>   public int read() throws IOException {
> int waiting = 0;
> do {
>   try {
> return super.read();
>   } catch (SocketTimeoutException e) {
> waiting += soTimeout;
> handleTimeout(e, waiting);
>   }
> } while (true);
>   }
>