HBASE-15520 Fix broken TestAsyncIPC

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fd5c0934
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fd5c0934
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fd5c0934

Branch: refs/heads/hbase-12439
Commit: fd5c0934b60664ecdde21a994910953339c7060d
Parents: 925c185
Author: zhangduo <zhang...@apache.org>
Authored: Thu Mar 24 10:24:34 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Mar 24 10:46:44 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       | 271 +++++++++----------
 .../hbase/ipc/AsyncServerResponseHandler.java   | 122 +++++----
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |   3 +-
 3 files changed, 188 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5c0934/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 787aa47..53eb824 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -17,18 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -52,6 +40,7 @@ import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
 import org.apache.hadoop.hbase.security.AuthMethod;
@@ -75,6 +64,18 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcCallback;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
 /**
  * Netty RPC channel
  */
@@ -84,12 +85,12 @@ public class AsyncRpcChannel {
 
   private static final int MAX_SASL_RETRIES = 5;
 
-  protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, 
TokenSelector<? extends
-      TokenIdentifier>> tokenHandlers = new HashMap<>();
+  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> 
TOKEN_HANDDLERS
+    = new HashMap<>();
 
   static {
-    
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
-        new AuthenticationTokenSelector());
+    
TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
+      new AuthenticationTokenSelector());
   }
 
   final AsyncRpcClient client;
@@ -111,7 +112,6 @@ public class AsyncRpcChannel {
   private Token<? extends TokenIdentifier> token;
   private String serverPrincipal;
 
-
   // NOTE: closed and connected flags below are only changed when a lock on 
pendingCalls
   private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, 
AsyncCall>();
   private boolean connected = false;
@@ -128,15 +128,14 @@ public class AsyncRpcChannel {
 
   /**
    * Constructor for netty RPC channel
-   *
    * @param bootstrap to construct channel on
-   * @param client    to connect with
+   * @param client to connect with
    * @param ticket of user which uses connection
    * @param serviceName name of service to connect to
    * @param address to connect to
    */
-  public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, 
User ticket, String
-      serviceName, InetSocketAddress address) {
+  public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, 
User ticket,
+      String serviceName, InetSocketAddress address) {
     this.client = client;
 
     this.ticket = ticket;
@@ -145,16 +144,12 @@ public class AsyncRpcChannel {
 
     this.channel = connect(bootstrap).channel();
 
-    name = ("IPC Client (" + channel.hashCode() + ") to " +
-        address.toString() +
-        ((ticket == null) ?
-            " from unknown user" :
-            (" from " + ticket.getName())));
+    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
+        + ((ticket == null) ? " from unknown user" : (" from " + 
ticket.getName())));
   }
 
   /**
    * Connect to channel
-   *
    * @param bootstrap to connect to
    * @return future of connection
    */
@@ -209,12 +204,11 @@ public class AsyncRpcChannel {
 
   /**
    * Start HBase connection
-   *
    * @param ch channel to start connection on
    */
   private void startHBaseConnection(Channel ch) {
-    ch.pipeline()
-        .addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+    ch.pipeline().addLast("frameDecoder",
+      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
     try {
       writeChannelHeader(ch).addListener(new 
GenericFutureListener<ChannelFuture>() {
@@ -248,7 +242,8 @@ public class AsyncRpcChannel {
   private SaslClientHandler getSaslHandler(final UserGroupInformation 
realTicket,
       final Bootstrap bootstrap) throws IOException {
     return new SaslClientHandler(realTicket, authMethod, token, 
serverPrincipal,
-        client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
+        client.fallbackAllowed,
+        client.conf.get("hbase.rpc.protection",
           SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
         new SaslClientHandler.SaslExceptionHandler() {
           @Override
@@ -258,7 +253,7 @@ public class AsyncRpcChannel {
               handleSaslConnectionFailure(retryCount, cause, realTicket);
 
               retryOrClose(bootstrap, failureCounter++, 
random.nextInt(reloginMaxBackoff) + 1,
-                  cause);
+                cause);
             } catch (IOException | InterruptedException e) {
               close(e);
             }
@@ -273,13 +268,12 @@ public class AsyncRpcChannel {
 
   /**
    * Retry to connect or close
-   *
-   * @param bootstrap      to connect with
-   * @param failureCount   failure count
-   * @param e              exception of fail
+   * @param bootstrap to connect with
+   * @param failureCount failure count
+   * @param e exception of fail
    */
-  private void retryOrClose(final Bootstrap bootstrap, int failureCount,
-      long timeout, Throwable e) {
+  private void retryOrClose(final Bootstrap bootstrap, int failureCount, long 
timeout,
+      Throwable e) {
     if (failureCount < client.maxRetries) {
       client.newTimeout(new TimerTask() {
         @Override
@@ -303,9 +297,8 @@ public class AsyncRpcChannel {
   public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
       final PayloadCarryingRpcController controller, final Message request,
       final Message responsePrototype, MetricsConnection.CallStats callStats) {
-    final AsyncCall call =
-        new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), 
method, request,
-            controller, responsePrototype, callStats);
+    final AsyncCall call = new AsyncCall(channel.eventLoop(), 
client.callIdCnt.getAndIncrement(),
+        method, request, controller, responsePrototype, callStats);
     controller.notifyOnCancel(new RpcCallback<Object>() {
       @Override
       public void run(Object parameter) {
@@ -331,9 +324,7 @@ public class AsyncRpcChannel {
       pendingCalls.put(call.id, call);
       // Add timeout for cleanup if none is present
       if (cleanupTimer == null && call.getRpcTimeout() > 0) {
-        cleanupTimer =
-            client.newTimeout(timeoutTask, call.getRpcTimeout(),
-              TimeUnit.MILLISECONDS);
+        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
       }
       if (!connected) {
         return call;
@@ -351,14 +342,13 @@ public class AsyncRpcChannel {
 
   /**
    * Write the channel header
-   *
    * @param channel to write to
    * @return future of write
    * @throws java.io.IOException on failure to write
    */
   private ChannelFuture writeChannelHeader(Channel channel) throws IOException 
{
-    RPCProtos.ConnectionHeader.Builder headerBuilder =
-        RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
+    RPCProtos.ConnectionHeader.Builder headerBuilder = 
RPCProtos.ConnectionHeader.newBuilder()
+        .setServiceName(serviceName);
 
     RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), 
authMethod);
     if (userInfoPB != null) {
@@ -375,7 +365,6 @@ public class AsyncRpcChannel {
     headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
     RPCProtos.ConnectionHeader header = headerBuilder.build();
 
-
     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
 
     ByteBuf b = channel.alloc().directBuffer(totalSize);
@@ -388,20 +377,19 @@ public class AsyncRpcChannel {
 
   /**
    * Write request to channel
-   *
-   * @param call    to write
+   * @param call to write
    */
   private void writeRequest(final AsyncCall call) {
     try {
       final RPCProtos.RequestHeader.Builder requestHeaderBuilder = 
RPCProtos.RequestHeader
           .newBuilder();
-      requestHeaderBuilder.setCallId(call.id)
-              .setMethodName(call.method.getName()).setRequestParam(call.param 
!= null);
+      
requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
+          .setRequestParam(call.param != null);
 
       if (Trace.isTracing()) {
         Span s = Trace.currentSpan();
-        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
-            setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
+            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
       }
 
       ByteBuffer cellBlock = 
client.buildCellBlock(call.controller.cellScanner());
@@ -411,7 +399,7 @@ public class AsyncRpcChannel {
         cellBlockBuilder.setLength(cellBlock.limit());
         requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
       }
-      // Only pass priority if there one.  Let zero be same as no priority.
+      // Only pass priority if there one. Let zero be same as no priority.
       if (call.controller.getPriority() != 
PayloadCarryingRpcController.PRIORITY_UNSET) {
         requestHeaderBuilder.setPriority(call.controller.getPriority());
       }
@@ -424,7 +412,7 @@ public class AsyncRpcChannel {
       }
 
       ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
-      try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
+      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
         call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, 
cellBlock));
       }
 
@@ -436,7 +424,6 @@ public class AsyncRpcChannel {
 
   /**
    * Set up server authorization
-   *
    * @throws java.io.IOException if auth setup failed
    */
   private void setupAuthorization() throws IOException {
@@ -447,10 +434,10 @@ public class AsyncRpcChannel {
     if (useSasl && securityInfo != null) {
       AuthenticationProtos.TokenIdentifier.Kind tokenKind = 
securityInfo.getTokenKind();
       if (tokenKind != null) {
-        TokenSelector<? extends TokenIdentifier> tokenSelector = 
tokenHandlers.get(tokenKind);
+        TokenSelector<? extends TokenIdentifier> tokenSelector = 
TOKEN_HANDDLERS.get(tokenKind);
         if (tokenSelector != null) {
-          token = tokenSelector
-              .selectToken(new Text(client.clusterId), 
ticket.getUGI().getTokens());
+          token = tokenSelector.selectToken(new Text(client.clusterId),
+            ticket.getUGI().getTokens());
         } else if (LOG.isDebugEnabled()) {
           LOG.debug("No token selector found for type " + tokenKind);
         }
@@ -460,7 +447,7 @@ public class AsyncRpcChannel {
         throw new IOException("Can't obtain server Kerberos config key from 
SecurityInfo");
       }
       this.serverPrincipal = 
SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
-          address.getAddress().getCanonicalHostName().toLowerCase());
+        address.getAddress().getCanonicalHostName().toLowerCase());
       if (LOG.isDebugEnabled()) {
         LOG.debug("RPC Server Kerberos principal name for service=" + 
serviceName + " is "
             + serverPrincipal);
@@ -476,16 +463,15 @@ public class AsyncRpcChannel {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Use " + authMethod + " authentication for service " + 
serviceName +
-          ", sasl=" + useSasl);
+      LOG.debug(
+        "Use " + authMethod + " authentication for service " + serviceName + 
", sasl=" + useSasl);
     }
     reloginMaxBackoff = 
client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
   }
 
   /**
    * Build the user information
-   *
-   * @param ugi        User Group Information
+   * @param ugi User Group Information
    * @param authMethod Authorization method
    * @return UserInformation protobuf
    */
@@ -499,7 +485,7 @@ public class AsyncRpcChannel {
       // Send effective user for Kerberos auth
       userInfoPB.setEffectiveUser(ugi.getUserName());
     } else if (authMethod == AuthMethod.SIMPLE) {
-      //Send both effective user and real user for simple auth
+      // Send both effective user and real user for simple auth
       userInfoPB.setEffectiveUser(ugi.getUserName());
       if (ugi.getRealUser() != null) {
         userInfoPB.setRealUser(ugi.getRealUser().getUserName());
@@ -510,8 +496,7 @@ public class AsyncRpcChannel {
 
   /**
    * Create connection preamble
-   *
-   * @param byteBuf    to write to
+   * @param byteBuf to write to
    * @param authMethod to write
    */
   private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
@@ -520,53 +505,61 @@ public class AsyncRpcChannel {
     byteBuf.writeByte(authMethod.code);
   }
 
+  private void close0(Throwable e) {
+    List<AsyncCall> toCleanup;
+    synchronized (pendingCalls) {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
+      pendingCalls.clear();
+    }
+    IOException closeException = null;
+    if (e != null) {
+      if (e instanceof IOException) {
+        closeException = (IOException) e;
+      } else {
+        closeException = new IOException(e);
+      }
+    }
+    // log the info
+    if (LOG.isDebugEnabled() && closeException != null) {
+      LOG.debug(name + ": closing ipc connection to " + address, 
closeException);
+    }
+    if (cleanupTimer != null) {
+      cleanupTimer.cancel();
+      cleanupTimer = null;
+    }
+    for (AsyncCall call : toCleanup) {
+      call.setFailed(closeException != null ? closeException
+          : new ConnectionClosingException(
+              "Call id=" + call.id + " on server " + address + " aborted: 
connection is closing"));
+    }
+    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(name + ": closed");
+    }
+  }
+
   /**
    * Close connection
-   *
    * @param e exception on close
    */
   public void close(final Throwable e) {
     client.removeConnection(this);
 
     // Move closing from the requesting thread to the channel thread
-    channel.eventLoop().execute(new Runnable() {
-      @Override
-      public void run() {
-        List<AsyncCall> toCleanup;
-        synchronized (pendingCalls) {
-          if (closed) {
-            return;
-          }
-          closed = true;
-          toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
-          pendingCalls.clear();
-        }
-        IOException closeException = null;
-        if (e != null) {
-          if (e instanceof IOException) {
-            closeException = (IOException) e;
-          } else {
-            closeException = new IOException(e);
-          }
-        }
-        // log the info
-        if (LOG.isDebugEnabled() && closeException != null) {
-          LOG.debug(name + ": closing ipc connection to " + address, 
closeException);
-        }
-        if (cleanupTimer != null) {
-          cleanupTimer.cancel();
-          cleanupTimer = null;
-        }
-        for (AsyncCall call : toCleanup) {
-          call.setFailed(closeException != null ? closeException : new 
ConnectionClosingException(
-              "Call id=" + call.id + " on server " + address + " aborted: 
connection is closing"));
-        }
-        channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(name + ": closed");
+    if (channel.eventLoop().inEventLoop()) {
+      close0(e);
+    } else {
+      channel.eventLoop().execute(new Runnable() {
+        @Override
+        public void run() {
+          close0(e);
         }
-      }
-    });
+      });
+    }
   }
 
   /**
@@ -592,9 +585,7 @@ public class AsyncRpcChannel {
         }
       }
       if (nextCleanupTaskDelay > 0) {
-        cleanupTimer =
-            client.newTimeout(timeoutTask, nextCleanupTaskDelay,
-              TimeUnit.MILLISECONDS);
+        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, 
TimeUnit.MILLISECONDS);
       } else {
         cleanupTimer = null;
       }
@@ -607,7 +598,6 @@ public class AsyncRpcChannel {
 
   /**
    * Check if the connection is alive
-   *
    * @return true if alive
    */
   public boolean isAlive() {
@@ -616,7 +606,6 @@ public class AsyncRpcChannel {
 
   /**
    * Check if user should authenticate over Kerberos
-   *
    * @return true if should be authenticated over Kerberos
    * @throws java.io.IOException on failure of check
    */
@@ -624,37 +613,31 @@ public class AsyncRpcChannel {
     UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     UserGroupInformation realUser = currentUser.getRealUser();
-    return authMethod == AuthMethod.KERBEROS &&
-        loginUser != null &&
-        //Make sure user logged in using Kerberos either keytab or TGT
-        loginUser.hasKerberosCredentials() &&
-        // relogin only in case it is the login user (e.g. JT)
-        // or superuser (like oozie).
-        (loginUser.equals(currentUser) || loginUser.equals(realUser));
+    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
+      // Make sure user logged in using Kerberos either keytab or TGT
+      loginUser.hasKerberosCredentials() &&
+      // relogin only in case it is the login user (e.g. JT)
+      // or superuser (like oozie).
+      (loginUser.equals(currentUser) || loginUser.equals(realUser));
   }
 
   /**
-   * If multiple clients with the same principal try to connect
-   * to the same server at the same time, the server assumes a
-   * replay attack is in progress. This is a feature of kerberos.
-   * In order to work around this, what is done is that the client
-   * backs off randomly and tries to initiate the connection
-   * again.
-   * The other problem is to do with ticket expiry. To handle that,
-   * a relogin is attempted.
+   * If multiple clients with the same principal try to connect to the same 
server at the same time,
+   * the server assumes a replay attack is in progress. This is a feature of 
kerberos. In order to
+   * work around this, what is done is that the client backs off randomly and 
tries to initiate the
+   * connection again. The other problem is to do with ticket expiry. To 
handle that, a relogin is
+   * attempted.
    * <p>
-   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb}
-   * method. In case when the user doesn't have valid credentials, we don't
-   * need to retry (from cache or ticket). In such cases, it is prudent to
-   * throw a runtime exception when we receive a SaslException from the
-   * underlying authentication implementation, so there is no retry from
-   * other high level (for eg, HCM or HBaseAdmin).
+   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} 
method. In case when the
+   * user doesn't have valid credentials, we don't need to retry (from cache 
or ticket). In such
+   * cases, it is prudent to throw a runtime exception when we receive a 
SaslException from the
+   * underlying authentication implementation, so there is no retry from other 
high level (for eg,
+   * HCM or HBaseAdmin).
    * </p>
-   *
    * @param currRetries retry count
-   * @param ex          exception describing fail
-   * @param user        which is trying to connect
-   * @throws java.io.IOException  if IO fail
+   * @param ex exception describing fail
+   * @param user which is trying to connect
+   * @throws java.io.IOException if IO fail
    * @throws InterruptedException if thread is interrupted
    */
   private void handleSaslConnectionFailure(final int currRetries, final 
Throwable ex,
@@ -665,7 +648,7 @@ public class AsyncRpcChannel {
         if (shouldAuthenticateOverKrb()) {
           if (currRetries < MAX_SASL_RETRIES) {
             LOG.debug("Exception encountered while connecting to the server : 
" + ex);
-            //try re-login
+            // try re-login
             if (UserGroupInformation.isLoginKeytabBased()) {
               UserGroupInformation.getLoginUser().reloginFromKeytab();
             } else {
@@ -675,23 +658,20 @@ public class AsyncRpcChannel {
             // Should reconnect
             return null;
           } else {
-            String msg = "Couldn't setup connection for " +
-                UserGroupInformation.getLoginUser().getUserName() +
-                " to " + serverPrincipal;
+            String msg = "Couldn't setup connection for "
+                + UserGroupInformation.getLoginUser().getUserName() + " to " + 
serverPrincipal;
             LOG.warn(msg);
             throw (IOException) new IOException(msg).initCause(ex);
           }
         } else {
-          LOG.warn("Exception encountered while connecting to " +
-              "the server : " + ex);
+          LOG.warn("Exception encountered while connecting to " + "the server 
: " + ex);
         }
         if (ex instanceof RemoteException) {
           throw (RemoteException) ex;
         }
         if (ex instanceof SaslException) {
-          String msg = "SASL authentication failed." +
-              " The most likely cause is missing or invalid credentials." +
-              " Consider 'kinit'.";
+          String msg = "SASL authentication failed."
+              + " The most likely cause is missing or invalid credentials." + 
" Consider 'kinit'.";
           LOG.fatal(msg, ex);
           throw new RuntimeException(msg, ex);
         }
@@ -718,7 +698,6 @@ public class AsyncRpcChannel {
     return false;
   }
 
-
   @Override
   public String toString() {
     return this.address.toString() + "/" + this.serviceName + "/" + 
this.ticket;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5c0934/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index 8f6c85b..e0c7586 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.CellScanner;
@@ -32,82 +27,89 @@ import org.apache.hadoop.ipc.RemoteException;
 
 import com.google.protobuf.Message;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
 /**
  * Handles Hbase responses
  */
 @InterfaceAudience.Private
-public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
+public class AsyncServerResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
   private final AsyncRpcChannel channel;
 
   /**
    * Constructor
-   *
    * @param channel on which this response handler operates
    */
   public AsyncServerResponseHandler(AsyncRpcChannel channel) {
     this.channel = channel;
   }
 
-  @Override public void channelRead(ChannelHandlerContext ctx, Object msg) 
throws Exception {
-    ByteBuf inBuffer = (ByteBuf) msg;
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) 
throws Exception {
     ByteBufInputStream in = new ByteBufInputStream(inBuffer);
     int totalSize = inBuffer.readableBytes();
-    try {
-      // Read the header
-      RPCProtos.ResponseHeader responseHeader = 
RPCProtos.ResponseHeader.parseDelimitedFrom(in);
-      int id = responseHeader.getCallId();
-      AsyncCall call = channel.removePendingCall(id);
-      if (call == null) {
-        // So we got a response for which we have no corresponding 'call' here 
on the client-side.
-        // We probably timed out waiting, cleaned up all references, and now 
the server decides
-        // to return a response.  There is nothing we can do w/ the response 
at this stage. Clean
-        // out the wire of the response so its out of the way and we can get 
other responses on
-        // this connection.
-        int readSoFar = 
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
-        int whatIsLeftToRead = totalSize - readSoFar;
+    // Read the header
+    RPCProtos.ResponseHeader responseHeader = 
RPCProtos.ResponseHeader.parseDelimitedFrom(in);
+    int id = responseHeader.getCallId();
+    AsyncCall call = channel.removePendingCall(id);
+    if (call == null) {
+      // So we got a response for which we have no corresponding 'call' here 
on the client-side.
+      // We probably timed out waiting, cleaned up all references, and now the 
server decides
+      // to return a response. There is nothing we can do w/ the response at 
this stage. Clean
+      // out the wire of the response so its out of the way and we can get 
other responses on
+      // this connection.
+      int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
+      int whatIsLeftToRead = totalSize - readSoFar;
 
-        // This is done through a Netty ByteBuf which has different behavior 
than InputStream.
-        // It does not return number of bytes read but will update pointer 
internally and throws an
-        // exception when too many bytes are to be skipped.
-        inBuffer.skipBytes(whatIsLeftToRead);
-        return;
-      }
+      // This is done through a Netty ByteBuf which has different behavior 
than InputStream.
+      // It does not return number of bytes read but will update pointer 
internally and throws an
+      // exception when too many bytes are to be skipped.
+      inBuffer.skipBytes(whatIsLeftToRead);
+      return;
+    }
 
-      if (responseHeader.hasException()) {
-        RPCProtos.ExceptionResponse exceptionResponse = 
responseHeader.getException();
-        RemoteException re = createRemoteException(exceptionResponse);
-        if (exceptionResponse.getExceptionClassName().
-            equals(FatalConnectionException.class.getName())) {
-          channel.close(re);
-        } else {
-          call.setFailed(re);
-        }
+    if (responseHeader.hasException()) {
+      RPCProtos.ExceptionResponse exceptionResponse = 
responseHeader.getException();
+      RemoteException re = createRemoteException(exceptionResponse);
+      if (exceptionResponse.getExceptionClassName()
+          .equals(FatalConnectionException.class.getName())) {
+        channel.close(re);
       } else {
-        Message value = null;
-        // Call may be null because it may have timedout and been cleaned up 
on this side already
-        if (call.responseDefaultType != null) {
-          Message.Builder builder = 
call.responseDefaultType.newBuilderForType();
-          ProtobufUtil.mergeDelimitedFrom(builder, in);
-          value = builder.build();
-        }
-        CellScanner cellBlockScanner = null;
-        if (responseHeader.hasCellBlockMeta()) {
-          int size = responseHeader.getCellBlockMeta().getLength();
-          byte[] cellBlock = new byte[size];
-          inBuffer.readBytes(cellBlock, 0, cellBlock.length);
-          cellBlockScanner = channel.client.createCellScanner(cellBlock);
-        }
-        call.setSuccess(value, cellBlockScanner);
-        call.callStats.setResponseSizeBytes(totalSize);
+        call.setFailed(re);
+      }
+    } else {
+      Message value = null;
+      // Call may be null because it may have timedout and been cleaned up on 
this side already
+      if (call.responseDefaultType != null) {
+        Message.Builder builder = call.responseDefaultType.newBuilderForType();
+        ProtobufUtil.mergeDelimitedFrom(builder, in);
+        value = builder.build();
+      }
+      CellScanner cellBlockScanner = null;
+      if (responseHeader.hasCellBlockMeta()) {
+        int size = responseHeader.getCellBlockMeta().getLength();
+        byte[] cellBlock = new byte[size];
+        inBuffer.readBytes(cellBlock, 0, cellBlock.length);
+        cellBlockScanner = channel.client.createCellScanner(cellBlock);
       }
-    } catch (IOException e) {
-      // Treat this as a fatal condition and close this connection
-      channel.close(e);
-    } finally {
-      inBuffer.release();
+      call.setSuccess(value, cellBlockScanner);
+      call.callStats.setResponseSizeBytes(totalSize);
     }
   }
 
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+    channel.close(cause);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    channel.close(new IOException("connection closed"));
+  }
+
   /**
    * @param e Proto exception
    * @return RemoteException made from passed <code>e</code>
@@ -118,7 +120,7 @@ public class AsyncServerResponseHandler extends 
ChannelInboundHandlerAdapter {
     return e.hasHostname() ?
         // If a hostname then add it to the RemoteWithExtrasException
         new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), e.getHostname(),
-            e.getPort(), doNotRetry) :
-        new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), doNotRetry);
+            e.getPort(), doNotRetry)
+        : new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), doNotRetry);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5c0934/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index e8da9ee..69c8fe2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.http.ConnectionClosedException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -320,7 +319,7 @@ public abstract class AbstractTestIPC {
           md.getOutputType().toProto(), User.getCurrent(), address,
           new MetricsConnection.CallStats());
         fail("RPC should have failed because it exceeds max request size");
-      } catch(ConnectionClosingException | ConnectionClosedException ex) {
+      } catch(IOException ex) {
         // pass
       }
     } finally {

Reply via email to