Repository: hbase Updated Branches: refs/heads/0.94 a0b773824 -> 25d4d266f
HBASE-12968 [0.94]SecureServer should not ignore CallQueueSize. (hongyu bi) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25d4d266 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25d4d266 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25d4d266 Branch: refs/heads/0.94 Commit: 25d4d266f3c07e2af3663b8e806a0bb6b015522e Parents: a0b7738 Author: Lars Hofhansl <la...@apache.org> Authored: Sat Feb 7 23:39:08 2015 -0800 Committer: Lars Hofhansl <la...@apache.org> Committed: Sat Feb 7 23:39:08 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ipc/SecureServer.java | 18 ++++++++++++++++-- .../org/apache/hadoop/hbase/ipc/HBaseServer.java | 4 ++-- 2 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/25d4d266/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java ---------------------------------------------------------------------- diff --git a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java index e630125..c5fe8c1 100644 --- a/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java +++ b/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureServer.java @@ -612,15 +612,29 @@ public abstract class SecureServer extends HBaseServer { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id + long callSize = buf.length; if (LOG.isTraceEnabled()) { LOG.trace(" got #" + id); } + // Enforcing the call queue size, this triggers a retry in the client + if ((callSize + callQueueSize.get()) > maxQueueSize) { + final SecureCall callTooBig = + new SecureCall(id, null, this, responder, callSize); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + setupResponse(responseBuffer, callTooBig, Status.FATAL, null, + IOException.class.getName(), + "Call queue is full, is ipc.server.max.callqueue.size too small?"); + responder.doRespond(callTooBig); + return; + } + Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param param.readFields(dis); - SecureCall call = new SecureCall(id, param, this, responder, buf.length); + SecureCall call = new SecureCall(id, param, this, responder, callSize); + callQueueSize.add(callSize); if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { priorityCallQueue.put(call); @@ -757,4 +771,4 @@ public abstract class SecureServer extends HBaseServer { protocol, getConf(), addr); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25d4d266/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 261ce4a..417e2a2 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -204,7 +204,7 @@ public abstract class HBaseServer implements RpcServer { protected Configuration conf; private int maxQueueLength; - private int maxQueueSize; + protected int maxQueueSize; protected int socketSendBufferSize; protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives @@ -1606,7 +1606,7 @@ public abstract class HBaseServer implements RpcServer { * @param error error message, if the call failed * @throws IOException */ - private void setupResponse(ByteArrayOutputStream response, + protected void setupResponse(ByteArrayOutputStream response, Call call, Status status, Writable rv, String errorClass, String error) throws IOException {