Repository: hbase
Updated Branches:
  refs/heads/0.94 a0b773824 -> 25d4d266f


HBASE-12968 [0.94]SecureServer should not ignore CallQueueSize. (hongyu bi)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25d4d266
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25d4d266
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25d4d266

Branch: refs/heads/0.94
Commit: 25d4d266f3c07e2af3663b8e806a0bb6b015522e
Parents: a0b7738
Author: Lars Hofhansl <la...@apache.org>
Authored: Sat Feb 7 23:39:08 2015 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Sat Feb 7 23:39:08 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/SecureServer.java | 18 ++++++++++++++++--
 .../org/apache/hadoop/hbase/ipc/HBaseServer.java  |  4 ++--
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/25d4d266/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
----------------------------------------------------------------------
diff --git 
a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java 
b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
index e630125..c5fe8c1 100644
--- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
+++ b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java
@@ -612,15 +612,29 @@ public abstract class SecureServer extends HBaseServer {
       DataInputStream dis =
         new DataInputStream(new ByteArrayInputStream(buf));
       int id = dis.readInt();                    // try to read an id
+      long callSize = buf.length;
 
       if (LOG.isTraceEnabled()) {
         LOG.trace(" got #" + id);
       }
 
+      // Enforcing the call queue size, this triggers a retry in the client
+      if ((callSize + callQueueSize.get()) > maxQueueSize) {
+        final SecureCall callTooBig =
+          new SecureCall(id, null, this, responder, callSize);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+        setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
+            IOException.class.getName(),
+            "Call queue is full, is ipc.server.max.callqueue.size too small?");
+        responder.doRespond(callTooBig);
+        return;
+      }
+
       Writable param = ReflectionUtils.newInstance(paramClass, conf);          
 // read param
       param.readFields(dis);
 
-      SecureCall call = new SecureCall(id, param, this, responder, buf.length);
+      SecureCall call = new SecureCall(id, param, this, responder, callSize);
+      callQueueSize.add(callSize);
 
       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) 
{
         priorityCallQueue.put(call);
@@ -757,4 +771,4 @@ public abstract class SecureServer extends HBaseServer {
           protocol, getConf(), addr);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/25d4d266/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java 
b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
index 261ce4a..417e2a2 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
@@ -204,7 +204,7 @@ public abstract class HBaseServer implements RpcServer {
   protected Configuration conf;
 
   private int maxQueueLength;
-  private int maxQueueSize;
+  protected int maxQueueSize;
   protected int socketSendBufferSize;
   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -1606,7 +1606,7 @@ public abstract class HBaseServer implements RpcServer {
    * @param error error message, if the call failed
    * @throws IOException
    */
-  private void setupResponse(ByteArrayOutputStream response,
+  protected void setupResponse(ByteArrayOutputStream response,
                              Call call, Status status,
                              Writable rv, String errorClass, String error)
   throws IOException {

Reply via email to