Repository: hbase
Updated Branches:
  refs/heads/branch-1 36d634d35 -> 7e0e86072


HBASE-15703 Deadline scheduler needs to return to the client info about skipped 
calls, not just drop them


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7e0e8607
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7e0e8607
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7e0e8607

Branch: refs/heads/branch-1
Commit: 7e0e86072aa2f372184d017aa18555fafa4bd459
Parents: 36d634d
Author: Mikhail Antonov <anto...@apache.org>
Authored: Mon May 2 15:23:07 2016 -0700
Committer: Mikhail Antonov <anto...@apache.org>
Committed: Mon May 2 15:27:13 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/CallDroppedException.java      | 43 ++++++++++++++++++++
 .../hadoop/hbase/CallQueueTooBigException.java  |  2 +
 .../client/PreemptiveFastFailInterceptor.java   |  3 +-
 .../hbase/exceptions/ClientExceptionsUtil.java  | 15 ++++++-
 .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java   | 42 ++++++-------------
 .../org/apache/hadoop/hbase/ipc/CallRunner.java | 38 +++++++++++++++++
 6 files changed, 112 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7e0e8607/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java
new file mode 100644
index 0000000..ed14153
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Returned to the clients when their request was discarded due to server 
being overloaded.
+ * Clients should retry upon receiving it.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallDroppedException extends IOException {
+  public CallDroppedException() {
+    super();
+  }
+
+  // Absence of this constructor prevents proper unwrapping of
+  // remote exception on the client side
+  public CallDroppedException(String message) {
+    super(message);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e0e8607/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
index 95ca988..9f8b386 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java
@@ -31,6 +31,8 @@ public class CallQueueTooBigException extends IOException {
     super();
   }
 
+  // Absence of this constructor prevents proper unwrapping of
+  // remote exception on the client side
   public CallQueueTooBigException(String message) {
     super(message);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e0e8607/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
index c87d6c7..fed87c1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
@@ -175,7 +175,8 @@ class PreemptiveFastFailInterceptor extends 
RetryingCallerInterceptor {
     boolean isLocalException = !(t2 instanceof RemoteException);
 
     if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) ||
-         ClientExceptionsUtil.isCallQueueTooBigException(t2)) {
+         ClientExceptionsUtil.isCallQueueTooBigException(t2) ||
+         ClientExceptionsUtil.isCallDroppedException(t2)) {
       couldNotCommunicateWithServer.setValue(true);
       guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
       handleFailureToServer(serverName, t2);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e0e8607/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
index cf2a16f..f367ed9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java
@@ -28,6 +28,7 @@ import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hbase.CallDroppedException;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
@@ -61,7 +62,8 @@ public final class ClientExceptionsUtil {
     return (cur instanceof RegionMovedException || cur instanceof 
RegionOpeningException
         || cur instanceof RegionTooBusyException || cur instanceof 
ThrottlingException
         || cur instanceof MultiActionResultTooLarge || cur instanceof 
RetryImmediatelyException
-        || cur instanceof CallQueueTooBigException || cur instanceof 
NotServingRegionException);
+        || cur instanceof CallQueueTooBigException || cur instanceof 
CallDroppedException
+        || cur instanceof NotServingRegionException);
   }
 
 
@@ -119,6 +121,17 @@ public final class ClientExceptionsUtil {
   }
 
   /**
+   * Checks if the exception is CallDroppedException (maybe wrapped
+   * into some RemoteException).
+   * @param t exception to check
+   * @return true if it's a CQTBE, false otherwise
+   */
+  public static boolean isCallDroppedException(Throwable t) {
+    t = findException(t);
+    return (t instanceof CallDroppedException);
+  }
+
+  /**
    * Check if the exception is something that indicates that we cannot
    * contact/communicate with the server.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e0e8607/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 266c6a2..08c488b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -56,16 +55,6 @@ public class AdaptiveLifoCoDelCallQueue implements 
BlockingQueue<CallRunner> {
   private AtomicLong numGeneralCallsDropped;
   private AtomicLong numLifoModeSwitches;
 
-  /**
-   * Lock held by take ops, all other locks are inside queue impl.
-   *
-   * NOTE: We want to have this lock so that in case when there're lot of 
already expired
-   * calls in the call queue a handler thread calling take() can just grab 
lock once and
-   * then fast-forward through the expired calls to the first non-expired 
without having
-   * to contend for locks on every element in underlying queue.
-   */
-  private final ReentrantLock lock = new ReentrantLock();
-
   // Both are in milliseconds
   private volatile int codelTargetDelay;
   private volatile int codelInterval;
@@ -120,25 +109,20 @@ public class AdaptiveLifoCoDelCallQueue implements 
BlockingQueue<CallRunner> {
    */
   @Override
   public CallRunner take() throws InterruptedException {
-    final ReentrantLock lock = this.lock;
-    lock.lock();
-    try {
-      CallRunner cr;
-      while(true) {
-        if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
-          numLifoModeSwitches.incrementAndGet();
-          cr = queue.takeLast();
-        } else {
-          cr = queue.takeFirst();
-        }
-        if (needToDrop(cr)) {
-          numGeneralCallsDropped.incrementAndGet();
-        } else {
-          return cr;
-        }
+    CallRunner cr;
+    while(true) {
+      if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
+        numLifoModeSwitches.incrementAndGet();
+        cr = queue.takeLast();
+      } else {
+        cr = queue.takeFirst();
+      }
+      if (needToDrop(cr)) {
+        numGeneralCallsDropped.incrementAndGet();
+        cr.drop();
+      } else {
+        return cr;
       }
-    } finally {
-      lock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e0e8607/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index a9cf0f1..3514245 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -21,6 +21,7 @@ import java.nio.channels.ClosedChannelException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CallDroppedException;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -45,6 +46,9 @@ import com.google.protobuf.Message;
 public class CallRunner {
   private static final Log LOG = LogFactory.getLog(CallRunner.class);
 
+  private static final CallDroppedException CALL_DROPPED_EXCEPTION
+    = new CallDroppedException();
+
   private Call call;
   private RpcServerInterface rpcServer;
   private MonitoredRPCHandler status;
@@ -161,4 +165,38 @@ public class CallRunner {
       cleanup();
     }
   }
+
+  /**
+   * When we want to drop this call because of server is overloaded.
+   */
+  public void drop() {
+    try {
+      if (!call.connection.channel.isOpen()) {
+        if (RpcServer.LOG.isDebugEnabled()) {
+          RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " 
+ call);
+        }
+        return;
+      }
+
+      // Set the response
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, 
server "
+        + (address != null ? address : "(channel closed)") + " is overloaded, 
please retry.");
+      call.sendResponseIfReady();
+    } catch (ClosedChannelException cce) {
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a 
ClosedChannelException, " +
+        "this means that the server " + (address != null ? address : "(channel 
closed)") +
+        " was processing a request but the client went away. The error message 
was: " +
+        cce.getMessage());
+    } catch (Exception e) {
+      RpcServer.LOG.warn(Thread.currentThread().getName()
+        + ": caught: " + StringUtils.stringifyException(e));
+    } finally {
+      if (!sucessful) {
+        this.rpcServer.addCallSize(call.getSize() * -1);
+      }
+      cleanup();
+    }
+  }
 }

Reply via email to