HADOOP-13549. Eliminate intermediate buffer for server-side PB encoding. 
Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39d1b1d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39d1b1d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39d1b1d7

Branch: refs/heads/YARN-3368
Commit: 39d1b1d747b1e325792b897b3264272f32b756a9
Parents: e6fcfe2
Author: Kihwal Lee <kih...@apache.org>
Authored: Tue Sep 6 10:01:01 2016 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Tue Sep 6 10:02:42 2016 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Server.java | 52 ++++++++++++++++++--
 1 file changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/39d1b1d7/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index ea4f533..531d574 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -119,6 +119,7 @@ import org.apache.htrace.core.Tracer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
@@ -2752,24 +2753,69 @@ public abstract class Server {
 
   private void setupResponse(RpcCall call,
       RpcResponseHeaderProto header, Writable rv) throws IOException {
+    final byte[] response;
+    if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
+      response = setupResponseForProtobuf(header, rv);
+    } else {
+      response = setupResponseForWritable(header, rv);
+    }
+    if (response.length > maxRespSize) {
+      LOG.warn("Large response size " + response.length + " for call "
+          + call.toString());
+    }
+    call.setResponse(ByteBuffer.wrap(response));
+  }
+
+  private byte[] setupResponseForWritable(
+      RpcResponseHeaderProto header, Writable rv) throws IOException {
     ResponseBuffer buf = responseBuffer.get().reset();
     try {
       RpcWritable.wrap(header).writeTo(buf);
       if (rv != null) {
         RpcWritable.wrap(rv).writeTo(buf);
       }
-      call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+      return buf.toByteArray();
     } finally {
       // Discard a large buf and reset it back to smaller size
       // to free up heap.
       if (buf.capacity() > maxRespSize) {
-        LOG.warn("Large response size " + buf.size() + " for call "
-            + call.toString());
         buf.setCapacity(INITIAL_RESP_BUF_SIZE);
       }
     }
   }
 
+
+  // writing to a pre-allocated array is the most efficient way to construct
+  // a protobuf response.
+  private byte[] setupResponseForProtobuf(
+      RpcResponseHeaderProto header, Writable rv) throws IOException {
+    Message payload = (rv != null)
+        ? ((RpcWritable.ProtobufWrapper)rv).getMessage() : null;
+    int length = getDelimitedLength(header);
+    if (payload != null) {
+      length += getDelimitedLength(payload);
+    }
+    byte[] buf = new byte[length + 4];
+    CodedOutputStream cos = CodedOutputStream.newInstance(buf);
+    // the stream only supports little endian ints
+    cos.writeRawByte((byte)((length >>> 24) & 0xFF));
+    cos.writeRawByte((byte)((length >>> 16) & 0xFF));
+    cos.writeRawByte((byte)((length >>>  8) & 0xFF));
+    cos.writeRawByte((byte)((length >>>  0) & 0xFF));
+    cos.writeRawVarint32(header.getSerializedSize());
+    header.writeTo(cos);
+    if (payload != null) {
+      cos.writeRawVarint32(payload.getSerializedSize());
+      payload.writeTo(cos);
+    }
+    return buf;
+  }
+
+  private static int getDelimitedLength(Message message) {
+    int length = message.getSerializedSize();
+    return length + CodedOutputStream.computeRawVarint32Size(length);
+  }
+
   /**
    * Setup response for the IPC Call on Fatal Error from a 
    * client that is using old version of Hadoop.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to