Repository: hbase
Updated Branches:
  refs/heads/master 28d619b22 -> 3f75ba195


HBASE-18013 Write response directly instead of creating a fake call when setup 
connection


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3f75ba19
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3f75ba19
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3f75ba19

Branch: refs/heads/master
Commit: 3f75ba195c62b1752f4a3f6a408c74f16ea6b0d4
Parents: 28d619b
Author: zhangduo <zhang...@apache.org>
Authored: Tue May 23 10:56:22 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue May 23 15:09:08 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/BufferChain.java    |   2 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 150 ++-------------
 .../ipc/NettyRpcServerPreambleHandler.java      |  57 ++++++
 .../hbase/ipc/NettyRpcServerRequestDecoder.java |  86 +++++++++
 .../ipc/NettyRpcServerResponseEncoder.java      |  55 ++++++
 .../hadoop/hbase/ipc/NettyServerCall.java       |   8 +-
 .../hbase/ipc/NettyServerRpcConnection.java     |  90 +--------
 .../apache/hadoop/hbase/ipc/RpcResponse.java    |  33 ++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  14 --
 .../org/apache/hadoop/hbase/ipc/ServerCall.java |  29 ++-
 .../hadoop/hbase/ipc/ServerRpcConnection.java   | 185 +++++++++++--------
 .../hbase/ipc/SimpleRpcServerResponder.java     |  63 +++----
 .../hadoop/hbase/ipc/SimpleServerCall.java      |   8 +-
 .../hbase/ipc/SimpleServerRpcConnection.java    | 113 +++--------
 14 files changed, 433 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index bd0515a..5d108e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -34,7 +34,7 @@ class BufferChain {
   private int bufferOffset = 0;
   private int size;
 
-  BufferChain(ByteBuffer[] buffers) {
+  BufferChain(ByteBuffer... buffers) {
     for (ByteBuffer b : buffers) {
       this.remaining += b.remaining();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 4a4ddba..4b06fab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -18,28 +18,19 @@
 package org.apache.hadoop.hbase.ipc;
 
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.FixedLengthFrameDecoder;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
@@ -72,11 +63,11 @@ public class NettyRpcServer extends RpcServer {
 
   public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
 
-  protected final InetSocketAddress bindAddress;
+  private final InetSocketAddress bindAddress;
 
   private final CountDownLatch closed = new CountDownLatch(1);
   private final Channel serverChannel;
-  private final ChannelGroup allChannels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);;
+  private final ChannelGroup allChannels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
   public NettyRpcServer(final Server server, final String name,
       final List<BlockingServiceAndInterface> services,
@@ -107,7 +98,21 @@ public class NettyRpcServer extends RpcServer {
     bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
     bootstrap.childOption(ChannelOption.ALLOCATOR,
         PooledByteBufAllocator.DEFAULT);
-    bootstrap.childHandler(new Initializer(maxRequestSize));
+    bootstrap.childHandler(new ChannelInitializer<Channel>() {
+
+      @Override
+      protected void initChannel(Channel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
+        FixedLengthFrameDecoder preambleDecoder = new 
FixedLengthFrameDecoder(6);
+        preambleDecoder.setSingleDecode(true);
+        pipeline.addLast("preambleDecoder", preambleDecoder);
+        pipeline.addLast("preambleHandler", new 
NettyRpcServerPreambleHandler(NettyRpcServer.this));
+        pipeline.addLast("frameDecoder",
+          new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
+        pipeline.addLast("decoder", new 
NettyRpcServerRequestDecoder(allChannels, metrics));
+        pipeline.addLast("encoder", new 
NettyRpcServerResponseEncoder(metrics));
+      }
+    });
 
     try {
       serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
@@ -173,125 +178,6 @@ public class NettyRpcServer extends RpcServer {
     return ((InetSocketAddress) serverChannel.localAddress());
   }
 
-  private class Initializer extends ChannelInitializer<SocketChannel> {
-
-    final int maxRequestSize;
-
-    Initializer(int maxRequestSize) {
-      this.maxRequestSize = maxRequestSize;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel channel) throws Exception {
-      ChannelPipeline pipeline = channel.pipeline();
-      pipeline.addLast("header", new ConnectionHeaderHandler());
-      pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-          maxRequestSize, 0, 4, 0, 4, true));
-      pipeline.addLast("decoder", new MessageDecoder());
-      pipeline.addLast("encoder", new MessageEncoder());
-    }
-
-  }
-
-  private class ConnectionHeaderHandler extends ByteToMessageDecoder {
-    private NettyServerRpcConnection connection;
-
-    ConnectionHeaderHandler() {
-    }
-
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
-        List<Object> out) throws Exception {
-      if (byteBuf.readableBytes() < 6) {
-        return;
-      }
-      connection = new NettyServerRpcConnection(NettyRpcServer.this, 
ctx.channel());
-      connection.readPreamble(byteBuf);
-      ((MessageDecoder) ctx.pipeline().get("decoder"))
-          .setConnection(connection);
-      ctx.pipeline().remove(this);
-    }
-
-  }
-
-  private class MessageDecoder extends ChannelInboundHandlerAdapter {
-
-    private NettyServerRpcConnection connection;
-
-    void setConnection(NettyServerRpcConnection connection) {
-      this.connection = connection;
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-      allChannels.add(ctx.channel());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Connection from " + ctx.channel().remoteAddress()
-            + "; # active connections: " + getNumOpenConnections());
-      }
-      super.channelActive(ctx);
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      ByteBuf input = (ByteBuf) msg;
-      // 4 bytes length field
-      metrics.receivedBytes(input.readableBytes() + 4);
-      connection.process(input);
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      allChannels.remove(ctx.channel());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress()
-            + ". Number of active connections: " + getNumOpenConnections());
-      }
-      super.channelInactive(ctx);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
-      allChannels.remove(ctx.channel());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Connection from " + ctx.channel().remoteAddress()
-            + " catch unexpected exception from downstream.", e.getCause());
-      }
-      ctx.channel().close();
-    }
-
-  }
-
-  private class MessageEncoder extends ChannelOutboundHandlerAdapter {
-
-    @Override
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) {
-      final NettyServerCall call = (NettyServerCall) msg;
-      ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers());
-      ctx.write(response, promise).addListener(new CallWriteListener(call));
-    }
-
-  }
-
-  private class CallWriteListener implements ChannelFutureListener {
-
-    private NettyServerCall call;
-
-    CallWriteListener(NettyServerCall call) {
-      this.call = call;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      call.done();
-      if (future.isSuccess()) {
-        metrics.sentBytes(call.response.size());
-      }
-    }
-
-  }
-
   @Override
   public void setSocketSendBufSize(int size) {
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
new file mode 100644
index 0000000..3754d44
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Handle connection preamble.
+ */
+@InterfaceAudience.Private
+class NettyRpcServerPreambleHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+
+  private final NettyRpcServer rpcServer;
+
+  public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) {
+    this.rpcServer = rpcServer;
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+    NettyServerRpcConnection conn = new NettyServerRpcConnection(rpcServer, 
ctx.channel());
+    ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes());
+    msg.readBytes(buf);
+    buf.flip();
+    if (!conn.processPreamble(buf)) {
+      conn.close();
+      return;
+    }
+    ChannelPipeline p = ctx.pipeline();
+    ((NettyRpcServerRequestDecoder) p.get("decoder")).setConnection(conn);
+    p.remove(this);
+    p.remove("preambleDecoder");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
new file mode 100644
index 0000000..a40e9d3
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Decoder for rpc request.
+ */
+@InterfaceAudience.Private
+class NettyRpcServerRequestDecoder extends ChannelInboundHandlerAdapter {
+
+  private final ChannelGroup allChannels;
+
+  private final MetricsHBaseServer metrics;
+
+  public NettyRpcServerRequestDecoder(ChannelGroup allChannels, 
MetricsHBaseServer metrics) {
+    this.allChannels = allChannels;
+    this.metrics = metrics;
+  }
+
+  private NettyServerRpcConnection connection;
+
+  void setConnection(NettyServerRpcConnection connection) {
+    this.connection = connection;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    allChannels.add(ctx.channel());
+    if (NettyRpcServer.LOG.isDebugEnabled()) {
+      NettyRpcServer.LOG.debug("Connection from " + 
ctx.channel().remoteAddress() +
+          "; # active connections: " + (allChannels.size() - 1));
+    }
+    super.channelActive(ctx);
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+    ByteBuf input = (ByteBuf) msg;
+    // 4 bytes length field
+    metrics.receivedBytes(input.readableBytes() + 4);
+    connection.process(input);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    allChannels.remove(ctx.channel());
+    if (NettyRpcServer.LOG.isDebugEnabled()) {
+      NettyRpcServer.LOG.debug("Disconnecting client: " + 
ctx.channel().remoteAddress() +
+          ". Number of active connections: " + (allChannels.size() - 1));
+    }
+    super.channelInactive(ctx);
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+    allChannels.remove(ctx.channel());
+    if (NettyRpcServer.LOG.isDebugEnabled()) {
+      NettyRpcServer.LOG.debug("Connection from " + 
ctx.channel().remoteAddress() +
+          " catch unexpected exception from downstream.",
+        e.getCause());
+    }
+    ctx.channel().close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java
new file mode 100644
index 0000000..b5b6a6b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerResponseEncoder.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Encoder for {@link RpcResponse}.
+ */
+@InterfaceAudience.Private
+class NettyRpcServerResponseEncoder extends ChannelOutboundHandlerAdapter {
+
+  private final MetricsHBaseServer metrics;
+
+  NettyRpcServerResponseEncoder(MetricsHBaseServer metrics) {
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise)
+      throws Exception {
+    if (msg instanceof RpcResponse) {
+      RpcResponse resp = (RpcResponse) msg;
+      BufferChain buf = resp.getResponse();
+      ctx.write(Unpooled.wrappedBuffer(buf.getBuffers()), 
promise).addListener(f -> {
+        resp.done();
+        if (f.isSuccess()) {
+          metrics.sentBytes(buf.size());
+        }
+      });
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index 3cb9a5a..bba2536 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import io.netty.channel.ChannelFutureListener;
-
 import java.io.IOException;
 import java.net.InetAddress;
 
@@ -53,10 +51,8 @@ class NettyServerCall extends 
ServerCall<NettyServerRpcConnection> {
    */
   @Override
   public synchronized void sendResponseIfReady() throws IOException {
+    // set param null to reduce memory pressure
+    this.param = null;
     connection.channel.writeAndFlush(this);
   }
-
-  public synchronized void sendResponseIfReady(ChannelFutureListener listener) 
throws IOException {
-    connection.channel.writeAndFlush(this).addListener(listener);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
index 7985295..61e12ab 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -19,30 +19,21 @@ package org.apache.hadoop.hbase.ipc;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFutureListener;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import 
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.htrace.TraceInfo;
 
 /**
@@ -64,75 +55,6 @@ class NettyServerRpcConnection extends ServerRpcConnection {
       this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
     }
     this.remotePort = inetSocketAddress.getPort();
-    this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, 
null, this, 0, null,
-        null, System.currentTimeMillis(), 0, rpcServer.reservoir, 
rpcServer.cellBlockBuilder, null);
-    this.setConnectionHeaderResponseCall = new 
NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
-        null, null, null, null, null, this, 0, null, null, 
System.currentTimeMillis(), 0,
-        rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
-    this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, 
null, null, null, null,
-        null, this, 0, null, null, System.currentTimeMillis(), 0, 
rpcServer.reservoir,
-        rpcServer.cellBlockBuilder, null);
-  }
-
-  void readPreamble(ByteBuf buffer) throws IOException {
-    byte[] rpcHead = { buffer.readByte(), buffer.readByte(), 
buffer.readByte(), buffer.readByte() };
-    if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
-      doBadPreambleHandling("Expected HEADER=" + 
Bytes.toStringBinary(HConstants.RPC_HEADER) +
-          " but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + 
toString());
-      return;
-    }
-    // Now read the next two bytes, the version and the auth to use.
-    int version = buffer.readByte();
-    byte authbyte = buffer.readByte();
-    this.authMethod = AuthMethod.valueOf(authbyte);
-    if (version != NettyRpcServer.CURRENT_VERSION) {
-      String msg = getFatalConnectionString(version, authbyte);
-      doBadPreambleHandling(msg, new WrongVersionException(msg));
-      return;
-    }
-    if (authMethod == null) {
-      String msg = getFatalConnectionString(version, authbyte);
-      doBadPreambleHandling(msg, new BadAuthException(msg));
-      return;
-    }
-    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-      if (this.rpcServer.allowFallbackToSimpleAuth) {
-        this.rpcServer.metrics.authenticationFallback();
-        authenticatedWithFallback = true;
-      } else {
-        AccessDeniedException ae = new AccessDeniedException("Authentication 
is required");
-        this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, 
ae.getMessage());
-        ((NettyServerCall) 
authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE);
-        return;
-      }
-    }
-    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
-      doRawSaslReply(SaslStatus.SUCCESS, new 
IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
-        null);
-      authMethod = AuthMethod.SIMPLE;
-      // client has already sent the initial Sasl message and we
-      // should ignore it. Both client and server should fall back
-      // to simple auth from now on.
-      skipInitialSaslHandshake = true;
-    }
-    if (authMethod != AuthMethod.SIMPLE) {
-      useSasl = true;
-    }
-    connectionPreambleRead = true;
-  }
-
-  private void doBadPreambleHandling(final String msg) throws IOException {
-    doBadPreambleHandling(msg, new FatalConnectionException(msg));
-  }
-
-  private void doBadPreambleHandling(final String msg, final Exception e) 
throws IOException {
-    NettyRpcServer.LOG.warn(msg);
-    NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, 
null, this, -1, null,
-        null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
-        this.rpcServer.cellBlockBuilder, null);
-    this.rpcServer.setupResponse(null, fakeCall, e, msg);
-    // closes out the connection.
-    fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
   }
 
   void process(final ByteBuf buf) throws IOException, InterruptedException {
@@ -145,9 +67,8 @@ class NettyServerRpcConnection extends ServerRpcConnection {
       };
       process(new SingleByteBuff(buf.nioBuffer()));
     } else {
-      byte[] data = new byte[buf.readableBytes()];
-      buf.readBytes(data, 0, data.length);
-      ByteBuffer connectionHeader = ByteBuffer.wrap(data);
+      ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
+      buf.readBytes(connectionHeader);
       buf.release();
       process(connectionHeader);
     }
@@ -203,4 +124,9 @@ class NettyServerRpcConnection extends ServerRpcConnection {
         remoteAddress, System.currentTimeMillis(), timeout, 
this.rpcServer.reservoir,
         this.rpcServer.cellBlockBuilder, reqCleanup);
   }
+
+  @Override
+  protected void doRespond(RpcResponse resp) {
+    channel.writeAndFlush(resp);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java
new file mode 100644
index 0000000..a8c1354
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcResponse.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * An interface represent the response of an rpc call.
+ */
+@InterfaceAudience.Private
+interface RpcResponse {
+
+  BufferChain getResponse();
+
+  default void done() {
+    // nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d68a05e..f899867 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -22,7 +22,6 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 
 import com.google.common.annotations.VisibleForTesting;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -341,19 +340,6 @@ public abstract class RpcServer implements 
RpcServerInterface,
     }
   }
 
-  /**
-   * Setup response for the RPC Call.
-   * @param response buffer to serialize the response into
-   * @param call {@link ServerCall} to which we are setting up the response
-   * @param error error message, if the call failed
-   * @throws IOException
-   */
-  protected void setupResponse(ByteArrayOutputStream response, ServerCall<?> 
call, Throwable t,
-      String error) throws IOException {
-    if (response != null) response.reset();
-    call.setResponse(null, null, t, error);
-  }
-
   Configuration getConf() {
     return conf;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 15fe3e6..e4bef98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -51,7 +51,7 @@ import org.apache.htrace.TraceInfo;
  * the result.
  */
 @InterfaceAudience.Private
-abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
+abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, 
RpcResponse {
 
   protected final int id;                             // the client's call id
   protected final BlockingService service;
@@ -127,7 +127,8 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
       justification = "Presume the lock on processing request held by caller 
is protection enough")
-  void done() {
+  @Override
+  public void done() {
     if (this.cellBlockStream != null) {
       // This will return back the BBs which we got from pool.
       this.cellBlockStream.releaseResources();
@@ -178,18 +179,6 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
         " deadline: " + deadline;
   }
 
-  protected synchronized void setSaslTokenResponse(ByteBuffer response) {
-    ByteBuffer[] responseBufs = new ByteBuffer[1];
-    responseBufs[0] = response;
-    this.response = new BufferChain(responseBufs);
-  }
-
-  protected synchronized void setConnectionHeaderResponse(ByteBuffer response) 
{
-    ByteBuffer[] responseBufs = new ByteBuffer[1];
-    responseBufs[0] = response;
-    this.response = new BufferChain(responseBufs);
-  }
-
   @Override
   public synchronized void setResponse(Message m, final CellScanner cells,
       Throwable t, String errorMsg) {
@@ -268,7 +257,7 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
     }
   }
 
-  protected void setExceptionResponse(Throwable t, String errorMsg,
+  static void setExceptionResponse(Throwable t, String errorMsg,
       ResponseHeader.Builder headerBuilder) {
     ExceptionResponse.Builder exceptionBuilder = 
ExceptionResponse.newBuilder();
     exceptionBuilder.setExceptionClassName(t.getClass().getName());
@@ -286,7 +275,7 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
     headerBuilder.setException(exceptionBuilder.build());
   }
 
-  protected ByteBuffer createHeaderAndMessageBytes(Message result, Message 
header,
+  static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
       int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
     // Organize the response as a set of bytebuffers rather than collect it 
all together inside
     // one big byte array; save on allocations.
@@ -336,7 +325,7 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
     }
   }
 
-  private void writeToCOS(Message result, Message header, int totalSize, 
ByteBuffer pbBuf)
+  private static void writeToCOS(Message result, Message header, int 
totalSize, ByteBuffer pbBuf)
       throws IOException {
     ByteBufferUtils.putInt(pbBuf, totalSize);
     // create COS that works on BB
@@ -351,7 +340,7 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
     cos.checkNoSpaceLeft();
   }
 
-  private ByteBuffer createHeaderAndMessageBytes(Message result, Message 
header,
+  private static ByteBuffer createHeaderAndMessageBytes(Message result, 
Message header,
       int totalSize, int totalPBSize) throws IOException {
     ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
     writeToCOS(result, header, totalSize, pbBuf);
@@ -523,4 +512,8 @@ abstract class ServerCall<T extends ServerRpcConnection> 
implements RpcCall {
     return tinfo;
   }
 
+  @Override
+  public synchronized BufferChain getResponse() {
+    return response;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index d4ab95c..c652afa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
+
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -68,9 +69,11 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -89,8 +92,6 @@ import org.apache.htrace.TraceInfo;
 abstract class ServerRpcConnection implements Closeable {
   /**  */
   protected final RpcServer rpcServer;
-  // If initial preamble with version and magic has been read or not.
-  protected boolean connectionPreambleRead = false;
   // If the connection header has been read or not.
   protected boolean connectionHeaderRead = false;
 
@@ -124,17 +125,6 @@ abstract class ServerRpcConnection implements Closeable {
   protected CryptoAES cryptoAES;
   protected boolean useWrap = false;
   protected boolean useCryptoAesWrap = false;
-  // Fake 'call' for failed authorization response
-  protected static final int AUTHORIZATION_FAILED_CALLID = -1;
-  protected ServerCall<?> authFailedCall;
-  protected ByteArrayOutputStream authFailedResponse =
-      new ByteArrayOutputStream();
-  // Fake 'call' for SASL context setup
-  protected static final int SASL_CALLID = -33;
-  protected ServerCall<?> saslCall;
-  // Fake 'call' for connection header response
-  protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
-  protected ServerCall<?> setConnectionHeaderResponseCall;
 
   // was authentication allowed with a fallback to simple auth
   protected boolean authenticatedWithFallback;
@@ -340,15 +330,13 @@ abstract class ServerRpcConnection implements Closeable {
   /**
    * No protobuf encoding of raw sasl messages
    */
-  protected void doRawSaslReply(SaslStatus status, Writable rv,
+  protected final void doRawSaslReply(SaslStatus status, Writable rv,
       String errorClass, String error) throws IOException {
-    ByteBufferOutputStream saslResponse = null;
-    DataOutputStream out = null;
-    try {
-      // In my testing, have noticed that sasl messages are usually
-      // in the ballpark of 100-200. That's why the initial capacity is 256.
-      saslResponse = new ByteBufferOutputStream(256);
-      out = new DataOutputStream(saslResponse);
+    BufferChain bc;
+    // In my testing, have noticed that sasl messages are usually
+    // in the ballpark of 100-200. That's why the initial capacity is 256.
+    try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
+        DataOutputStream  out = new DataOutputStream(saslResponse)) {
       out.writeInt(status.state); // write status
       if (status == SaslStatus.SUCCESS) {
         rv.write(out);
@@ -356,16 +344,9 @@ abstract class ServerRpcConnection implements Closeable {
         WritableUtils.writeString(out, errorClass);
         WritableUtils.writeString(out, error);
       }
-      saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
-      saslCall.sendResponseIfReady();
-    } finally {
-      if (saslResponse != null) {
-        saslResponse.close();
-      }
-      if (out != null) {
-        out.close();
-      }
+      bc = new BufferChain(saslResponse.getByteBuffer());
     }
+    doRespond(() -> bc);
   }
 
   public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
@@ -481,8 +462,7 @@ abstract class ServerRpcConnection implements Closeable {
     }
   }
 
-  private void processUnwrappedData(byte[] inBuf) throws IOException,
-  InterruptedException {
+  private void processUnwrappedData(byte[] inBuf) throws IOException, 
InterruptedException {
     ReadableByteChannel ch = Channels.newChannel(new 
ByteArrayInputStream(inBuf));
     // Read all RPCs contained in the inBuf, even partial ones
     while (true) {
@@ -536,7 +516,7 @@ abstract class ServerRpcConnection implements Closeable {
     }
   }
 
-  protected boolean authorizeConnection() throws IOException {
+  private boolean authorizeConnection() throws IOException {
     try {
       // If auth method is DIGEST, the token was obtained by the
       // real user for the effective user, therefore not required to
@@ -553,16 +533,14 @@ abstract class ServerRpcConnection implements Closeable {
         RpcServer.LOG.debug("Connection authorization failed: " + 
ae.getMessage(), ae);
       }
       this.rpcServer.metrics.authorizationFailure();
-      this.rpcServer.setupResponse(authFailedResponse, authFailedCall,
-        new AccessDeniedException(ae), ae.getMessage());
-      authFailedCall.sendResponseIfReady();
+      doRespond(getErrorResponse(ae.getMessage(), new 
AccessDeniedException(ae)));
       return false;
     }
     return true;
   }
 
   // Reads the connection header following version
-  protected void processConnectionHeader(ByteBuff buf) throws IOException {
+  private void processConnectionHeader(ByteBuff buf) throws IOException {
     if (buf.hasArray()) {
       this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
     } else {
@@ -630,6 +608,9 @@ abstract class ServerRpcConnection implements Closeable {
     }
   }
 
+  /**
+   * Send the response for connection header
+   */
   private void 
responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
       throws FatalConnectionException {
     // Response the connection header if Crypto AES is enabled
@@ -640,38 +621,21 @@ abstract class ServerRpcConnection implements Closeable {
       byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
       Bytes.putBytes(unwrapped, 0, 
Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
       Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, 
connectionHeaderResBytes.length);
-
-      doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, 
unwrapped.length));
+      byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length);
+      BufferChain bc;
+      try (ByteBufferOutputStream response = new 
ByteBufferOutputStream(wrapped.length + 4);
+          DataOutputStream out = new DataOutputStream(response)) {
+        out.writeInt(wrapped.length);
+        out.write(wrapped);
+        bc = new BufferChain(response.getByteBuffer());
+      }
+      doRespond(() -> bc);
     } catch (IOException ex) {
       throw new UnsupportedCryptoException(ex.getMessage(), ex);
     }
   }
 
-  /**
-   * Send the response for connection header
-   */
-  private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
-      throws IOException {
-    ByteBufferOutputStream response = null;
-    DataOutputStream out = null;
-    try {
-      response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
-      out = new DataOutputStream(response);
-      out.writeInt(wrappedCipherMetaData.length);
-      out.write(wrappedCipherMetaData);
-
-      setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
-          .getByteBuffer());
-      setConnectionHeaderResponseCall.sendResponseIfReady();
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-      if (response != null) {
-        response.close();
-      }
-    }
-  }
+  protected abstract void doRespond(RpcResponse resp) throws IOException;
 
   /**
    * @param buf
@@ -709,14 +673,14 @@ abstract class ServerRpcConnection implements Closeable {
     // Enforcing the call queue size, this triggers a retry in the client
     // This is a bit late to be doing this check - we have already read in the
     // total request.
-    if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > 
this.rpcServer.maxQueueSizeInBytes) {
+    if ((totalRequestSize +
+        this.rpcServer.callQueueSizeInBytes.sum()) > 
this.rpcServer.maxQueueSizeInBytes) {
       final ServerCall<?> callTooBig = createCall(id, this.service, null, 
null, null, null,
         totalRequestSize, null, null, 0, this.callCleanup);
-      ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
       this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
-      this.rpcServer.setupResponse(responseBuffer, callTooBig, 
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
-          "Call queue is full on " + this.rpcServer.server.getServerName()
-              + ", is hbase.ipc.server.max.callqueue.size too small?");
+      callTooBig.setResponse(null, null,  
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+        "Call queue is full on " + this.rpcServer.server.getServerName() +
+        ", is hbase.ipc.server.max.callqueue.size too small?");
       callTooBig.sendResponseIfReady();
       return;
     }
@@ -773,11 +737,9 @@ abstract class ServerRpcConnection implements Closeable {
         t = new DoNotRetryIOException(t);
       }
 
-      final ServerCall<?> readParamsFailedCall = createCall(id, this.service, 
null, null, null, null,
+      ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, 
null, null, null,
         totalRequestSize, null, null, 0, this.callCleanup);
-      ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-      this.rpcServer.setupResponse(responseBuffer, readParamsFailedCall, t,
-          msg + "; " + t.getMessage());
+      readParamsFailedCall.setResponse(null, null, t, msg + "; " + 
t.getMessage());
       readParamsFailedCall.sendResponseIfReady();
       return;
     }
@@ -794,16 +756,81 @@ abstract class ServerRpcConnection implements Closeable {
 
     if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, 
call))) {
       this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
-
-      ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
       this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
-      this.rpcServer.setupResponse(responseBuffer, call, 
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
-          "Call queue is full on " + this.rpcServer.server.getServerName()
-              + ", too many items queued ?");
+      call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+        "Call queue is full on " + this.rpcServer.server.getServerName() +
+            ", too many items queued ?");
       call.sendResponseIfReady();
     }
   }
 
+  protected final RpcResponse getErrorResponse(String msg, Exception e) throws 
IOException {
+    ResponseHeader.Builder headerBuilder = 
ResponseHeader.newBuilder().setCallId(-1);
+    ServerCall.setExceptionResponse(e, msg, headerBuilder);
+    ByteBuffer headerBuf =
+        ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, 
null);
+    BufferChain buf = new BufferChain(headerBuf);
+    return () -> buf;
+  }
+
+  private void doBadPreambleHandling(String msg) throws IOException {
+    doBadPreambleHandling(msg, new FatalConnectionException(msg));
+  }
+
+  private void doBadPreambleHandling(String msg, Exception e) throws 
IOException {
+    SimpleRpcServer.LOG.warn(msg);
+    doRespond(getErrorResponse(msg, e));
+  }
+
+  protected final boolean processPreamble(ByteBuffer preambleBuffer) throws 
IOException {
+    assert preambleBuffer.remaining() == 6;
+    for (int i = 0; i < RPC_HEADER.length; i++) {
+      if (RPC_HEADER[i] != preambleBuffer.get()) {
+        doBadPreambleHandling(
+          "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but 
received HEADER=" +
+              Bytes.toStringBinary(preambleBuffer.array(), 0, 
RPC_HEADER.length) + " from " +
+              toString());
+        return false;
+      }
+    }
+    int version = preambleBuffer.get() & 0xFF;
+    byte authbyte = preambleBuffer.get();
+    this.authMethod = AuthMethod.valueOf(authbyte);
+    if (version != SimpleRpcServer.CURRENT_VERSION) {
+      String msg = getFatalConnectionString(version, authbyte);
+      doBadPreambleHandling(msg, new WrongVersionException(msg));
+      return false;
+    }
+    if (authMethod == null) {
+      String msg = getFatalConnectionString(version, authbyte);
+      doBadPreambleHandling(msg, new BadAuthException(msg));
+      return false;
+    }
+    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+      if (this.rpcServer.allowFallbackToSimpleAuth) {
+        this.rpcServer.metrics.authenticationFallback();
+        authenticatedWithFallback = true;
+      } else {
+        AccessDeniedException ae = new AccessDeniedException("Authentication 
is required");
+        doRespond(getErrorResponse(ae.getMessage(), ae));
+        return false;
+      }
+    }
+    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+      doRawSaslReply(SaslStatus.SUCCESS, new 
IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
+        null);
+      authMethod = AuthMethod.SIMPLE;
+      // client has already sent the initial Sasl message and we
+      // should ignore it. Both client and server should fall back
+      // to simple auth from now on.
+      skipInitialSaslHandshake = true;
+    }
+    if (authMethod != AuthMethod.SIMPLE) {
+      useSasl = true;
+    }
+    return true;
+  }
+
   public abstract boolean isConnectionOpen();
 
   public abstract ServerCall<?> createCall(int id, BlockingService service, 
MethodDescriptor md,

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
index 5f072a9..c3f3f5d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
@@ -38,11 +38,11 @@ import org.apache.hadoop.util.StringUtils;
  */
 @InterfaceAudience.Private
 class SimpleRpcServerResponder extends Thread {
-  /**  */
+
   private final SimpleRpcServer simpleRpcServer;
   private final Selector writeSelector;
   private final Set<SimpleServerRpcConnection> writingCons =
-      Collections.newSetFromMap(new 
ConcurrentHashMap<SimpleServerRpcConnection, Boolean>());
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
 
   SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException 
{
     this.simpleRpcServer = simpleRpcServer;
@@ -175,9 +175,9 @@ class SimpleRpcServerResponder extends Thread {
         if (connection == null) {
           throw new IllegalStateException("Coding error: SelectionKey key 
without attachment.");
         }
-        SimpleServerCall call = connection.responseQueue.peekFirst();
-        if (call != null && now > call.lastSentTime + 
this.simpleRpcServer.purgeTimeout) {
-          conWithOldCalls.add(call.getConnection());
+        if (connection.lastSentTime > 0 &&
+            now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout) 
{
+          conWithOldCalls.add(connection);
         }
       }
     }
@@ -217,35 +217,37 @@ class SimpleRpcServerResponder extends Thread {
   /**
    * Process the response for this call. You need to have the lock on
    * {@link 
org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
-   * @param call the call
    * @return true if we proceed the call fully, false otherwise.
    * @throws IOException
    */
-  boolean processResponse(final SimpleServerCall call) throws IOException {
+  private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse 
resp)
+      throws IOException {
     boolean error = true;
+    BufferChain buf = resp.getResponse();
     try {
       // Send as much data as we can in the non-blocking fashion
       long numBytes =
-          this.simpleRpcServer.channelWrite(call.getConnection().channel, 
call.response);
+          this.simpleRpcServer.channelWrite(conn.channel, buf);
       if (numBytes < 0) {
-        throw new HBaseIOException(
-            "Error writing on the socket " + "for the call:" + 
call.toShortString());
+        throw new HBaseIOException("Error writing on the socket " + conn);
       }
       error = false;
     } finally {
       if (error) {
-        SimpleRpcServer.LOG.debug(getName() + call.toShortString() + ": output 
error -- closing");
+        SimpleRpcServer.LOG.debug(conn + ": output error -- closing");
         // We will be closing this connection itself. Mark this call as done 
so that all the
         // buffer(s) it got from pool can get released
-        call.done();
-        this.simpleRpcServer.closeConnection(call.getConnection());
+        resp.done();
+        this.simpleRpcServer.closeConnection(conn);
       }
     }
 
-    if (!call.response.hasRemaining()) {
-      call.done();
+    if (!buf.hasRemaining()) {
+      resp.done();
       return true;
     } else {
+      // set the serve time when the response has to be sent later
+      conn.lastSentTime = System.currentTimeMillis();
       return false; // Socket can't take more, we will have to come back.
     }
   }
@@ -263,12 +265,12 @@ class SimpleRpcServerResponder extends Thread {
     try {
       for (int i = 0; i < 20; i++) {
         // protection if some handlers manage to need all the responder
-        SimpleServerCall call = connection.responseQueue.pollFirst();
-        if (call == null) {
+        RpcResponse resp = connection.responseQueue.pollFirst();
+        if (resp == null) {
           return true;
         }
-        if (!processResponse(call)) {
-          connection.responseQueue.addFirst(call);
+        if (!processResponse(connection, resp)) {
+          connection.responseQueue.addFirst(resp);
           return false;
         }
       }
@@ -282,35 +284,30 @@ class SimpleRpcServerResponder extends Thread {
   //
   // Enqueue a response from the application.
   //
-  void doRespond(SimpleServerCall call) throws IOException {
+  void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws 
IOException {
     boolean added = false;
-
     // If there is already a write in progress, we don't wait. This allows to 
free the handlers
     // immediately for other tasks.
-    if (call.getConnection().responseQueue.isEmpty() &&
-        call.getConnection().responseWriteLock.tryLock()) {
+    if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) {
       try {
-        if (call.getConnection().responseQueue.isEmpty()) {
+        if (conn.responseQueue.isEmpty()) {
           // If we're alone, we can try to do a direct call to the socket. It's
-          // an optimisation to save on context switches and data transfer 
between cores..
-          if (processResponse(call)) {
+          // an optimization to save on context switches and data transfer 
between cores..
+          if (processResponse(conn, resp)) {
             return; // we're done.
           }
           // Too big to fit, putting ahead.
-          call.getConnection().responseQueue.addFirst(call);
+          conn.responseQueue.addFirst(resp);
           added = true; // We will register to the selector later, outside of 
the lock.
         }
       } finally {
-        call.getConnection().responseWriteLock.unlock();
+        conn.responseWriteLock.unlock();
       }
     }
 
     if (!added) {
-      call.getConnection().responseQueue.addLast(call);
+      conn.responseQueue.addLast(resp);
     }
-    call.responder.registerForWrite(call.getConnection());
-
-    // set the serve time when the response has to be sent later
-    call.lastSentTime = System.currentTimeMillis();
+    registerForWrite(conn);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
index af575ea..080c4ba 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -37,8 +37,6 @@ import org.apache.htrace.TraceInfo;
 @InterfaceAudience.Private
 class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
 
-  long lastSentTime;
-
   final SimpleRpcServerResponder responder;
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"NP_NULL_ON_SOME_PATH",
@@ -59,7 +57,7 @@ class SimpleServerCall extends 
ServerCall<SimpleServerRpcConnection> {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
       justification = "Presume the lock on processing request held by caller 
is protection enough")
   @Override
-  void done() {
+  public void done() {
     super.done();
     this.getConnection().decRpcCount(); // Say that we're done with this call.
   }
@@ -68,10 +66,10 @@ class SimpleServerCall extends 
ServerCall<SimpleServerRpcConnection> {
   public synchronized void sendResponseIfReady() throws IOException {
     // set param null to reduce memory pressure
     this.param = null;
-    this.responder.doRespond(this);
+    this.responder.doRespond(getConnection(), this);
   }
 
   SimpleServerRpcConnection getConnection() {
-    return (SimpleServerRpcConnection) this.connection;
+    return this.connection;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f75ba19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index b2507d8..2327c09 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -24,7 +24,6 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SocketChannel;
-import java.util.Arrays;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
@@ -32,26 +31,19 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import 
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.htrace.TraceInfo;
 
 /** Reads calls from a connection and queues them for handling. */
@@ -64,13 +56,17 @@ class SimpleServerRpcConnection extends ServerRpcConnection 
{
   private ByteBuff data;
   private ByteBuffer dataLengthBuffer;
   private ByteBuffer preambleBuffer;
-  protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue =
-      new ConcurrentLinkedDeque<>();
-  final Lock responseWriteLock = new ReentrantLock();
   private final LongAdder rpcCount = new LongAdder(); // number of outstanding 
rpcs
   private long lastContact;
   private final Socket socket;
-  private final SimpleRpcServerResponder responder;
+  final SimpleRpcServerResponder responder;
+
+  // If initial preamble with version and magic has been read or not.
+  private boolean connectionPreambleRead = false;
+
+  final ConcurrentLinkedDeque<RpcResponse> responseQueue = new 
ConcurrentLinkedDeque<>();
+  final Lock responseWriteLock = new ReentrantLock();
+  long lastSentTime = -1L;
 
   public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel 
channel,
       long lastContact) {
@@ -95,15 +91,6 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
           "Connection: unable to set socket send buffer size to " + 
rpcServer.socketSendBufferSize);
       }
     }
-    this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, 
null, this, 0, null,
-        null, System.currentTimeMillis(), 0, rpcServer.reservoir, 
rpcServer.cellBlockBuilder, null,
-        rpcServer.responder);
-    this.setConnectionHeaderResponseCall = new 
SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
-        null, null, null, null, null, this, 0, null, null, 
System.currentTimeMillis(), 0,
-        rpcServer.reservoir, rpcServer.cellBlockBuilder, null, 
rpcServer.responder);
-    this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, 
null, null, null, null,
-        null, this, 0, null, null, System.currentTimeMillis(), 0, 
rpcServer.reservoir,
-        rpcServer.cellBlockBuilder, null, rpcServer.responder);
     this.responder = rpcServer.responder;
   }
 
@@ -138,49 +125,9 @@ class SimpleServerRpcConnection extends 
ServerRpcConnection {
     if (count < 0 || preambleBuffer.remaining() > 0) {
       return count;
     }
-    // Check for 'HBas' magic.
     preambleBuffer.flip();
-    for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
-      if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
-        return doBadPreambleHandling("Expected HEADER=" +
-            Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received 
HEADER=" +
-            Bytes.toStringBinary(preambleBuffer.array(), 0, 
HConstants.RPC_HEADER.length) +
-            " from " + toString());
-      }
-    }
-    int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
-    byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
-    this.authMethod = AuthMethod.valueOf(authbyte);
-    if (version != SimpleRpcServer.CURRENT_VERSION) {
-      String msg = getFatalConnectionString(version, authbyte);
-      return doBadPreambleHandling(msg, new WrongVersionException(msg));
-    }
-    if (authMethod == null) {
-      String msg = getFatalConnectionString(version, authbyte);
-      return doBadPreambleHandling(msg, new BadAuthException(msg));
-    }
-    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-      if (this.rpcServer.allowFallbackToSimpleAuth) {
-        this.rpcServer.metrics.authenticationFallback();
-        authenticatedWithFallback = true;
-      } else {
-        AccessDeniedException ae = new AccessDeniedException("Authentication 
is required");
-        this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, 
ae.getMessage());
-        authFailedCall.sendResponseIfReady();
-        throw ae;
-      }
-    }
-    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
-      doRawSaslReply(SaslStatus.SUCCESS, new 
IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
-        null);
-      authMethod = AuthMethod.SIMPLE;
-      // client has already sent the initial Sasl message and we
-      // should ignore it. Both client and server should fall back
-      // to simple auth from now on.
-      skipInitialSaslHandshake = true;
-    }
-    if (authMethod != AuthMethod.SIMPLE) {
-      useSasl = true;
+    if (!processPreamble(preambleBuffer)) {
+      return -1;
     }
     preambleBuffer = null; // do not need it anymore
     connectionPreambleRead = true;
@@ -272,19 +219,15 @@ class SimpleServerRpcConnection extends 
ServerRpcConnection {
           // Otherwise, throw a DoNotRetryIOException.
           if 
(VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
             RequestTooBigException.MAJOR_VERSION, 
RequestTooBigException.MINOR_VERSION)) {
-            this.rpcServer.setupResponse(null, reqTooBig, 
SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION,
-              msg);
+            reqTooBig.setResponse(null, null, 
SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg);
           } else {
-            this.rpcServer.setupResponse(null, reqTooBig, new 
DoNotRetryIOException(), msg);
-          }
-          // We are going to close the connection, make sure we process the 
response
-          // before that. In rare case when this fails, we still close the 
connection.
-          responseWriteLock.lock();
-          try {
-            this.responder.processResponse(reqTooBig);
-          } finally {
-            responseWriteLock.unlock();
+            reqTooBig.setResponse(null, null, new DoNotRetryIOException(), 
msg);
           }
+          // In most cases we will write out the response directly. If not, it 
is still OK to just
+          // close the connection without writing out the reqTooBig response. 
Do not try to write
+          // out directly here, and it will cause deserialization error if the 
connection is slow
+          // and we have a half writing response in the queue.
+          reqTooBig.sendResponseIfReady();
         }
         // Close the connection
         return -1;
@@ -365,21 +308,6 @@ class SimpleServerRpcConnection extends 
ServerRpcConnection {
     }
   }
 
-  private int doBadPreambleHandling(final String msg) throws IOException {
-    return doBadPreambleHandling(msg, new FatalConnectionException(msg));
-  }
-
-  private int doBadPreambleHandling(final String msg, final Exception e) 
throws IOException {
-    SimpleRpcServer.LOG.warn(msg);
-    SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, 
null, null, this, -1,
-        null, null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
-        this.rpcServer.cellBlockBuilder, null, responder);
-    this.rpcServer.setupResponse(null, fakeCall, e, msg);
-    this.responder.doRespond(fakeCall);
-    // Returning -1 closes out the connection.
-    return -1;
-  }
-
   @Override
   public synchronized void close() {
     disposeSasl();
@@ -421,4 +349,9 @@ class SimpleServerRpcConnection extends ServerRpcConnection 
{
         remoteAddress, System.currentTimeMillis(), timeout, 
this.rpcServer.reservoir,
         this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
   }
+
+  @Override
+  protected void doRespond(RpcResponse resp) throws IOException {
+    responder.doRespond(this, resp);
+  }
 }
\ No newline at end of file

Reply via email to