HBASE-15202 Reduce garbage while setting response (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b33a740 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b33a740 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b33a740 Branch: refs/heads/hbase-12439 Commit: 7b33a740b10b05b50f8e9d3b2a1ef37593cb6eb3 Parents: f5fba2b Author: ramkrishna <ramkrishna.s.vasude...@gmail.com> Authored: Thu Feb 4 23:23:31 2016 +0530 Committer: ramkrishna <ramkrishna.s.vasude...@gmail.com> Committed: Thu Feb 4 23:23:31 2016 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 51 +++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7b33a740/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a9c64a3..98669e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -131,6 +131,7 @@ import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -427,14 +428,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } Message header = headerBuilder.build(); - // Organize the response as a set of bytebuffers rather than collect it all together inside - // one big byte array; save on allocations. - ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); - ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); - int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) + - (this.cellBlock == null? 0: this.cellBlock.limit()); - ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); - bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock); + byte[] b = createHeaderAndMessageBytes(result, header); + + bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock); + if (connection.useWrap) { bc = wrapWithSasl(bc); } @@ -454,6 +451,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + private byte[] createHeaderAndMessageBytes(Message result, Message header) + throws IOException { + // Organize the response as a set of bytebuffers rather than collect it all together inside + // one big byte array; save on allocations. + int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, + resultVintSize = 0; + if (header != null) { + headerSerializedSize = header.getSerializedSize(); + headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize); + } + if (result != null) { + resultSerializedSize = result.getSerializedSize(); + resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize); + } + // calculate the total size + int totalSize = headerSerializedSize + headerVintSize + + (resultSerializedSize + resultVintSize) + + (this.cellBlock == null ? 0 : this.cellBlock.limit()); + // The byte[] should also hold the totalSize of the header, message and the cellblock + byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT]; + // The RpcClient expects the int to be in a format that code be decoded by + // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) + // form of writing int. + Bytes.putInt(b, 0, totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, + b.length - Bytes.SIZEOF_INT); + if (header != null) { + cos.writeMessageNoTag(header); + } + if (result != null) { + cos.writeMessageNoTag(result); + } + cos.flush(); + cos.checkNoSpaceLeft(); + return b; + } + private BufferChain wrapWithSasl(BufferChain bc) throws IOException { if (!this.connection.useSasl) return bc;