[ 
https://issues.apache.org/jira/browse/HADOOP-15530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongjun Zhang updated HADOOP-15530:
-----------------------------------
    Description: 
In Client.java, sendRpcRequest does the following

{code}
   /** Initiates a rpc call by sending the rpc request to the remote server.
     * Note: this is not called from the Connection thread, but by other
     * threads.
     * @param call - the rpc request
     */
    public void sendRpcRequest(final Call call)
        throws InterruptedException, IOException {
      if (shouldCloseConnection.get()) {
        return;
      }

      // Serialize the call to be sent. This is done from the actual
      // caller thread, rather than the sendParamsExecutor thread,

      // so that if the serialization throws an error, it is reported
      // properly. This also parallelizes the serialization.
      //
      // Format of a call on the wire:
      // 0) Length of rest below (1 + 2)
      // 1) RpcRequestHeader  - is serialized Delimited hence contains length
      // 2) RpcRequest
      //
      // Items '1' and '2' are prepared here. 
      RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
          clientId);

      final ResponseBuffer buf = new ResponseBuffer();
      header.writeDelimitedTo(buf);
      RpcWritable.wrap(call.rpcRequest).writeTo(buf);

      synchronized (sendRpcRequestLock) {
        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (ipcStreams.out) {
                if (shouldCloseConnection.get()) {
                  return;
                }
                if (LOG.isDebugEnabled()) {
                  LOG.debug(getName() + " sending #" + call.id
                      + " " + call.rpcRequest);
                }
                // RpcRequestHeader + RpcRequest
                ipcStreams.sendRequest(buf.toByteArray());
                ipcStreams.flush();
              }
            } catch (IOException e) {
              // exception at this point would leave the connection in an
              // unrecoverable state (eg half a call left on the wire).
              // So, close the connection, killing any outstanding calls
              markClosed(e);
            } finally {
              //the buffer is just an in-memory buffer, but it is still polite 
to
              // close early
              IOUtils.closeStream(buf);
            }
          }
        });

        try {
          senderFuture.get();
        } catch (ExecutionException e) {
          Throwable cause = e.getCause();

          // cause should only be a RuntimeException as the Runnable above
          // catches IOException
          if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else {
            throw new RuntimeException("unexpected checked exception", cause);
          }
        }
      }
    }
{code}

It's observed that the call can be stuck at {{senderFuture.get();}}

Given that we support rpcTimeOut, we could chose the second method of Future 
below:
{code}
  /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
{code}

In theory, since the RPC at client is serialized, we could just use the main 
thread to do the execution, instead of using a threadpool to create new thread. 
This can be discussed in a separate jira.

And why the RPC is not processed and returned by NN is another topic.



                                              

  was:
In Client.java, sendRpcRequest does the following

{code}
   /** Initiates a rpc call by sending the rpc request to the remote server.
     * Note: this is not called from the Connection thread, but by other
     * threads.
     * @param call - the rpc request
     */
    public void sendRpcRequest(final Call call)
        throws InterruptedException, IOException {
      if (shouldCloseConnection.get()) {
        return;
      }

      // Serialize the call to be sent. This is done from the actual
      // caller thread, rather than the sendParamsExecutor thread,

      // so that if the serialization throws an error, it is reported
      // properly. This also parallelizes the serialization.
      //
      // Format of a call on the wire:
      // 0) Length of rest below (1 + 2)
      // 1) RpcRequestHeader  - is serialized Delimited hence contains length
      // 2) RpcRequest
      //
      // Items '1' and '2' are prepared here. 
      RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
          clientId);

      final ResponseBuffer buf = new ResponseBuffer();
      header.writeDelimitedTo(buf);
      RpcWritable.wrap(call.rpcRequest).writeTo(buf);

      synchronized (sendRpcRequestLock) {
        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (ipcStreams.out) {
                if (shouldCloseConnection.get()) {
                  return;
                }
                if (LOG.isDebugEnabled()) {
                  LOG.debug(getName() + " sending #" + call.id
                      + " " + call.rpcRequest);
                }
                // RpcRequestHeader + RpcRequest
                ipcStreams.sendRequest(buf.toByteArray());
                ipcStreams.flush();
              }
            } catch (IOException e) {
              // exception at this point would leave the connection in an
              // unrecoverable state (eg half a call left on the wire).
              // So, close the connection, killing any outstanding calls
              markClosed(e);
            } finally {
              //the buffer is just an in-memory buffer, but it is still polite 
to
              // close early
              IOUtils.closeStream(buf);
            }
          }
        });

        try {
          senderFuture.get();
        } catch (ExecutionException e) {
          Throwable cause = e.getCause();

          // cause should only be a RuntimeException as the Runnable above
          // catches IOException
          if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else {
            throw new RuntimeException("unexpected checked exception", cause);
          }
        }
      }
    }
{code}

It's observed that the call can be stuck at {{senderFuture.get();}}

Given that we support rpcTimeOut, we could chose the second method of Future 
below:
{code}
  /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
{code}

In theory, since the RPC at client is serialized, we could just use the main 
thread to do the execution, instead of using a threadpool to create new thread. 
This can be discussed in a separate jira.




                                              


> RPC could stuck at senderFuture.get()
> -------------------------------------
>
>                 Key: HADOOP-15530
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15530
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>            Reporter: Yongjun Zhang
>            Priority: Major
>
> In Client.java, sendRpcRequest does the following
> {code}
>    /** Initiates a rpc call by sending the rpc request to the remote server.
>      * Note: this is not called from the Connection thread, but by other
>      * threads.
>      * @param call - the rpc request
>      */
>     public void sendRpcRequest(final Call call)
>         throws InterruptedException, IOException {
>       if (shouldCloseConnection.get()) {
>         return;
>       }
>       // Serialize the call to be sent. This is done from the actual
>       // caller thread, rather than the sendParamsExecutor thread,
>       // so that if the serialization throws an error, it is reported
>       // properly. This also parallelizes the serialization.
>       //
>       // Format of a call on the wire:
>       // 0) Length of rest below (1 + 2)
>       // 1) RpcRequestHeader  - is serialized Delimited hence contains length
>       // 2) RpcRequest
>       //
>       // Items '1' and '2' are prepared here. 
>       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
>           call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
>           clientId);
>       final ResponseBuffer buf = new ResponseBuffer();
>       header.writeDelimitedTo(buf);
>       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
>       synchronized (sendRpcRequestLock) {
>         Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
>           @Override
>           public void run() {
>             try {
>               synchronized (ipcStreams.out) {
>                 if (shouldCloseConnection.get()) {
>                   return;
>                 }
>                 if (LOG.isDebugEnabled()) {
>                   LOG.debug(getName() + " sending #" + call.id
>                       + " " + call.rpcRequest);
>                 }
>                 // RpcRequestHeader + RpcRequest
>                 ipcStreams.sendRequest(buf.toByteArray());
>                 ipcStreams.flush();
>               }
>             } catch (IOException e) {
>               // exception at this point would leave the connection in an
>               // unrecoverable state (eg half a call left on the wire).
>               // So, close the connection, killing any outstanding calls
>               markClosed(e);
>             } finally {
>               //the buffer is just an in-memory buffer, but it is still 
> polite to
>               // close early
>               IOUtils.closeStream(buf);
>             }
>           }
>         });
>         try {
>           senderFuture.get();
>         } catch (ExecutionException e) {
>           Throwable cause = e.getCause();
>           // cause should only be a RuntimeException as the Runnable above
>           // catches IOException
>           if (cause instanceof RuntimeException) {
>             throw (RuntimeException) cause;
>           } else {
>             throw new RuntimeException("unexpected checked exception", cause);
>           }
>         }
>       }
>     }
> {code}
> It's observed that the call can be stuck at {{senderFuture.get();}}
> Given that we support rpcTimeOut, we could chose the second method of Future 
> below:
> {code}
>   /**
>      * Waits if necessary for the computation to complete, and then
>      * retrieves its result.
>      *
>      * @return the computed result
>      * @throws CancellationException if the computation was cancelled
>      * @throws ExecutionException if the computation threw an
>      * exception
>      * @throws InterruptedException if the current thread was interrupted
>      * while waiting
>      */
>     V get() throws InterruptedException, ExecutionException;
>     /**
>      * Waits if necessary for at most the given time for the computation
>      * to complete, and then retrieves its result, if available.
>      *
>      * @param timeout the maximum time to wait
>      * @param unit the time unit of the timeout argument
>      * @return the computed result
>      * @throws CancellationException if the computation was cancelled
>      * @throws ExecutionException if the computation threw an
>      * exception
>      * @throws InterruptedException if the current thread was interrupted
>      * while waiting
>      * @throws TimeoutException if the wait timed out
>      */
>     V get(long timeout, TimeUnit unit)
>         throws InterruptedException, ExecutionException, TimeoutException;
> {code}
> In theory, since the RPC at client is serialized, we could just use the main 
> thread to do the execution, instead of using a threadpool to create new 
> thread. This can be discussed in a separate jira.
> And why the RPC is not processed and returned by NN is another topic.
>                                               



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to