Repository: hadoop
Updated Branches:
  refs/heads/branch-2.9 b2220685e -> b5f13f6dc


HDFS-13488. RBF: Reject requests when a Router is overloaded. Contributed by 
Inigo Goiri.

(cherry picked from commit 37269261d1232bc71708f30c76193188258ef4bd)
(cherry picked from commit 5fef28d0d4b27c5df31d325650e46f5ab5f5630f)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5f13f6d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5f13f6d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5f13f6d

Branch: refs/heads/branch-2.9
Commit: b5f13f6dcf2c6d4ea51ed7d53345bf35bfd31bab
Parents: b222068
Author: Yiqun Lin <yq...@apache.org>
Authored: Wed May 2 14:49:39 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Wed May 2 15:16:47 2018 +0800

----------------------------------------------------------------------
 .../federation/metrics/FederationRPCMBean.java  |   2 +
 .../metrics/FederationRPCMetrics.java           |  10 +
 .../FederationRPCPerformanceMonitor.java        |   5 +
 .../server/federation/router/RBFConfigKeys.java |   3 +
 .../federation/router/RouterRpcClient.java      |  31 ++-
 .../federation/router/RouterRpcMonitor.java     |   6 +
 .../federation/router/RouterRpcServer.java      |  11 +-
 .../router/RouterSafeModeException.java         |  53 ----
 .../src/main/resources/hdfs-rbf-default.xml     |   9 +
 .../server/federation/StateStoreDFSCluster.java |  28 +++
 .../router/TestRouterClientRejectOverload.java  | 243 +++++++++++++++++++
 .../router/TestRouterRPCClientRetries.java      |  51 +---
 .../federation/router/TestRouterSafemode.java   |   3 +-
 13 files changed, 348 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 3e031fe..973c398 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -40,6 +40,8 @@ public interface FederationRPCMBean {
 
   long getProxyOpFailureStandby();
 
+  long getProxyOpFailureClientOverloaded();
+
   long getProxyOpNotImplemented();
 
   long getProxyOpRetries();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 94d3383..9ab4e5a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -54,6 +54,8 @@ public class FederationRPCMetrics implements 
FederationRPCMBean {
   private MutableCounterLong proxyOpFailureStandby;
   @Metric("Number of operations to hit a standby NN")
   private MutableCounterLong proxyOpFailureCommunicate;
+  @Metric("Number of operations to hit a client overloaded Router")
+  private MutableCounterLong proxyOpFailureClientOverloaded;
   @Metric("Number of operations not implemented")
   private MutableCounterLong proxyOpNotImplemented;
   @Metric("Number of operation retries")
@@ -118,6 +120,14 @@ public class FederationRPCMetrics implements 
FederationRPCMBean {
     return proxyOpFailureCommunicate.value();
   }
 
+  public void incrProxyOpFailureClientOverloaded() {
+    proxyOpFailureClientOverloaded.incr();
+  }
+
+  @Override
+  public long getProxyOpFailureClientOverloaded() {
+    return proxyOpFailureClientOverloaded.value();
+  }
 
   public void incrProxyOpNotImplemented() {
     proxyOpNotImplemented.incr();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
index 547ebb5..2c2741e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -154,6 +154,11 @@ public class FederationRPCPerformanceMonitor implements 
RouterRpcMonitor {
   }
 
   @Override
+  public void proxyOpFailureClientOverloaded() {
+    metrics.incrProxyOpFailureClientOverloaded();
+  }
+
+  @Override
   public void proxyOpNotImplemented() {
     metrics.incrProxyOpNotImplemented();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 091ff75..6ab3fc2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -113,6 +113,9 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
       FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
   public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
+  public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD =
+      FEDERATION_ROUTER_PREFIX + "client.reject.overload";
+  public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = 
false;
 
   // HDFS Router State Store connection
   public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 214e438..e4d304d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -35,13 +35,16 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -98,7 +101,7 @@ public class RouterRpcClient {
   /** Connection pool to the Namenodes per user for performance. */
   private final ConnectionManager connectionManager;
   /** Service to run asynchronous calls. */
-  private final ExecutorService executorService;
+  private final ThreadPoolExecutor executorService;
   /** Retry policy for router -> NN communication. */
   private final RetryPolicy retryPolicy;
   /** Optional perf monitor. */
@@ -131,8 +134,16 @@ public class RouterRpcClient {
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat("RPC Router Client-%d")
         .build();
-    this.executorService = Executors.newFixedThreadPool(
-        numThreads, threadFactory);
+    BlockingQueue<Runnable> workQueue;
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
+      workQueue = new ArrayBlockingQueue<>(numThreads);
+    } else {
+      workQueue = new LinkedBlockingQueue<>();
+    }
+    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
+        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
 
     this.rpcMonitor = monitor;
 
@@ -1098,6 +1109,16 @@ public class RouterRpcClient {
       }
 
       return results;
+    } catch (RejectedExecutionException e) {
+      if (rpcMonitor != null) {
+        rpcMonitor.proxyOpFailureClientOverloaded();
+      }
+      int active = executorService.getActiveCount();
+      int total = executorService.getMaximumPoolSize();
+      String msg = "Not enough client threads " + active + "/" + total;
+      LOG.error(msg);
+      throw new StandbyException(
+          "Router " + routerId + " is overloaded: " + msg);
     } catch (InterruptedException ex) {
       LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
       throw new IOException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
index df9aa11..7af71af 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -76,6 +76,12 @@ public interface RouterRpcMonitor {
   void proxyOpFailureCommunicate();
 
   /**
+   * Failed to proxy an operation to a Namenode because the client was
+   * overloaded.
+   */
+  void proxyOpFailureClientOverloaded();
+
+  /**
    * Failed to proxy an operation because it is not implemented.
    */
   void proxyOpNotImplemented();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 3c82141..98821b4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -273,7 +273,6 @@ public class RouterRpcServer extends AbstractService
     // We don't want the server to log the full stack trace for some exceptions
     this.rpcServer.addTerseExceptions(
         RemoteException.class,
-        StandbyException.class,
         SafeModeException.class,
         FileNotFoundException.class,
         FileAlreadyExistsException.class,
@@ -282,6 +281,9 @@ public class RouterRpcServer extends AbstractService
         NotReplicatedYetException.class,
         IOException.class);
 
+    this.rpcServer.addSuppressedLoggingExceptions(
+        StandbyException.class);
+
     // The RPC-server port can be ephemeral... ensure we have the correct info
     InetSocketAddress listenAddress = this.rpcServer.getListenerAddress();
     this.rpcAddress = new InetSocketAddress(
@@ -395,7 +397,7 @@ public class RouterRpcServer extends AbstractService
    * @throws UnsupportedOperationException If the operation is not supported.
    */
   protected void checkOperation(OperationCategory op, boolean supported)
-      throws RouterSafeModeException, UnsupportedOperationException {
+      throws StandbyException, UnsupportedOperationException {
     checkOperation(op);
 
     if (!supported) {
@@ -417,7 +419,7 @@ public class RouterRpcServer extends AbstractService
    *                           client requests.
    */
   protected void checkOperation(OperationCategory op)
-      throws RouterSafeModeException {
+      throws StandbyException {
     // Log the function we are currently calling.
     if (rpcMonitor != null) {
       rpcMonitor.startOp();
@@ -441,7 +443,8 @@ public class RouterRpcServer extends AbstractService
       if (rpcMonitor != null) {
         rpcMonitor.routerFailureSafemode();
       }
-      throw new RouterSafeModeException(router.getRouterId(), op);
+      throw new StandbyException("Router " + router.getRouterId() +
+          " is in safe mode and cannot handle " + op + " requests");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
deleted file mode 100644
index 7a78b5b..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.ipc.StandbyException;
-
-/**
- * Exception that the Router throws when it is in safe mode. This extends
- * {@link StandbyException} for the client to try another Router when it gets
- * this exception.
- */
-public class RouterSafeModeException extends StandbyException {
-
-  private static final long serialVersionUID = 453568188334993493L;
-
-  /** Identifier of the Router that generated this exception. */
-  private final String routerId;
-
-  /**
-   * Build a new Router safe mode exception.
-   * @param router Identifier of the Router.
-   * @param op Category of the operation (READ/WRITE).
-   */
-  public RouterSafeModeException(String router, OperationCategory op) {
-    super("Router " + router + " is in safe mode and cannot handle " + op
-        + " requests.");
-    this.routerId = router;
-  }
-
-  /**
-   * Get the id of the Router that generated this exception.
-   * @return Id of the Router that generated this exception.
-   */
-  public String getRouterId() {
-    return this.routerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index e4edb42..8ab356f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -410,4 +410,13 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.client.reject.overload</name>
+    <value>false</value>
+    <description>
+      Set to true to reject client requests when we run out of RPC client
+      threads.
+    </description>
+  </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
index bf63b18..9d56f13 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
@@ -28,6 +28,10 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -37,6 +41,7 @@ import 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Test utility to mimic a federated HDFS cluster with a router and a state
@@ -145,4 +150,27 @@ public class StateStoreDFSCluster extends 
MiniRouterDFSCluster {
     entries.add(entry);
     return entries;
   }
+
+  /**
+   * Get the client configuration which targets all the Routers. It uses the HA
+   * setup to fails over between them.
+   * @return Configuration for the client which uses two routers.
+   */
+  public Configuration getRouterClientConf() {
+    List<RouterContext> routers = getRouters();
+    Configuration clientConf = DFSTestUtil.newHAConfiguration("fed");
+    int i = 0;
+    List<String> names = new ArrayList<>(routers.size());
+    for (RouterContext routerContext : routers) {
+      String name = "r" + i++;
+      clientConf.set(
+          DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name,
+          "localhost:" + routerContext.getRpcPort());
+      names.add(name);
+    }
+    clientConf.set(DFSUtil.addKeySuffixes(
+        HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"),
+        StringUtils.join(",", names));
+    return clientConf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
new file mode 100644
index 0000000..3c51e13
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test the Router overload control which rejects requests when the RPC client
+ * is overloaded. This feature is managed by
+ * {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
+ */
+public class TestRouterClientRejectOverload {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
+
+  private StateStoreDFSCluster cluster;
+
+  @After
+  public void cleanup() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void setupCluster(boolean overloadControl) throws Exception {
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 2);
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .metrics()
+        .admin()
+        .rpc()
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
+    // Overload control
+    routerConf.setBoolean(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);
+
+    // No need for datanodes as we use renewLease() for testing
+    cluster.setNumDatanodesPerNameservice(0);
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+  }
+
+  @Test
+  public void testWithoutOverloadControl() throws Exception {
+    setupCluster(false);
+
+    // Nobody should get overloaded
+    testOverloaded(0);
+
+    // Set subcluster 0 as slow
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    NameNode nn0 = dfsCluster.getNameNode(0);
+    simulateSlowNamenode(nn0, 1);
+
+    // Nobody should get overloaded, but it will be really slow
+    testOverloaded(0);
+
+    // No rejected requests expected
+    for (RouterContext router : cluster.getRouters()) {
+      FederationRPCMetrics rpcMetrics =
+          router.getRouter().getRpcServer().getRPCMetrics();
+      assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
+    }
+  }
+
+  @Test
+  public void testOverloadControl() throws Exception {
+    setupCluster(true);
+
+    List<RouterContext> routers = cluster.getRouters();
+    FederationRPCMetrics rpcMetrics0 =
+        routers.get(0).getRouter().getRpcServer().getRPCMetrics();
+    FederationRPCMetrics rpcMetrics1 =
+        routers.get(1).getRouter().getRpcServer().getRPCMetrics();
+
+    // Nobody should get overloaded
+    testOverloaded(0);
+    assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
+    assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());
+
+    // Set subcluster 0 as slow
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    NameNode nn0 = dfsCluster.getNameNode(0);
+    simulateSlowNamenode(nn0, 1);
+
+    // The subcluster should be overloaded now and reject 4-5 requests
+    testOverloaded(4, 6);
+    assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
+        + rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);
+
+    // Client using HA with 2 Routers
+    // A single Router gets overloaded, but 2 will handle it
+    Configuration clientConf = cluster.getRouterClientConf();
+
+    // Each Router should get a similar number of ops (>=8) out of 2*10
+    long iniProxyOps0 = rpcMetrics0.getProxyOps();
+    long iniProxyOps1 = rpcMetrics1.getProxyOps();
+    testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
+    long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
+    long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
+    assertEquals(2 * 10, proxyOps0 + proxyOps1);
+    assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8);
+    assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8);
+  }
+
+  private void testOverloaded(int expOverload) throws Exception {
+    testOverloaded(expOverload, expOverload);
+  }
+
+  private void testOverloaded(int expOverloadMin, int expOverloadMax)
+      throws Exception {
+    RouterContext routerContext = cluster.getRandomRouter();
+    URI address = routerContext.getFileSystemURI();
+    Configuration conf = new HdfsConfiguration();
+    testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
+  }
+
+  /**
+   * Test if the Router gets overloaded by submitting requests in parallel.
+   * We check how many requests got rejected at the end.
+   * @param expOverloadMin Min number of requests expected as overloaded.
+   * @param expOverloadMax Max number of requests expected as overloaded.
+   * @param address Destination address.
+   * @param conf Configuration of the client.
+   * @param numOps Number of operations to submit.
+   * @throws Exception If it cannot perform the test.
+   */
+  private void testOverloaded(int expOverloadMin, int expOverloadMax,
+      final URI address, final Configuration conf, final int numOps)
+          throws Exception {
+
+    // Submit renewLease() ops which go to all subclusters
+    final AtomicInteger overloadException = new AtomicInteger();
+    ExecutorService exec = Executors.newFixedThreadPool(numOps);
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < numOps; i++) {
+      // Stagger the operations a little (50ms)
+      final int sleepTime = i * 50;
+      Future<?> future = exec.submit(new Runnable() {
+        @Override
+        public void run() {
+          DFSClient routerClient = null;
+          try {
+            Thread.sleep(sleepTime);
+            routerClient = new DFSClient(address, conf);
+            String clientName = routerClient.getClientName();
+            ClientProtocol routerProto = routerClient.getNamenode();
+            routerProto.renewLease(clientName);
+          } catch (RemoteException re) {
+            IOException ioe = re.unwrapRemoteException();
+            assertTrue("Wrong exception: " + ioe,
+                ioe instanceof StandbyException);
+            assertExceptionContains("is overloaded", ioe);
+            overloadException.incrementAndGet();
+          } catch (IOException e) {
+            fail("Unexpected exception: " + e);
+          } catch (InterruptedException e) {
+            fail("Cannot sleep: " + e);
+          } finally {
+            if (routerClient != null) {
+              try {
+                routerClient.close();
+              } catch (IOException e) {
+                LOG.error("Cannot close the client");
+              }
+            }
+          }
+        }
+      });
+      futures.add(future);
+    }
+    // Wait until all the requests are done
+    while (!futures.isEmpty()) {
+      futures.remove(0).get();
+    }
+    exec.shutdown();
+
+    int num = overloadException.get();
+    if (expOverloadMin == expOverloadMax) {
+      assertEquals(expOverloadMin, num);
+    } else {
+      assertTrue("Expected >=" + expOverloadMin + " but was " + num,
+          num >= expOverloadMin);
+      assertTrue("Expected <=" + expOverloadMax + " but was " + num,
+          num <= expOverloadMax);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
index 91dc2e7..e5ab3ab 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.util.List;
@@ -44,12 +44,8 @@ import 
org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.After;
@@ -57,11 +53,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
-import org.mockito.internal.util.reflection.Whitebox;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Supplier;
 
@@ -70,9 +61,6 @@ import com.google.common.base.Supplier;
  */
 public class TestRouterRPCClientRetries {
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
-
   private static StateStoreDFSCluster cluster;
   private static NamenodeContext nnContext1;
   private static RouterContext routerContext;
@@ -144,7 +132,7 @@ public class TestRouterRPCClientRetries {
       fail("Should have thrown RemoteException error.");
     } catch (RemoteException e) {
       String ns0 = cluster.getNameservices().get(0);
-      GenericTestUtils.assertExceptionContains(
+      assertExceptionContains(
           "No namenode available under nameservice " + ns0, e);
     }
 
@@ -212,14 +200,14 @@ public class TestRouterRPCClientRetries {
     // Making subcluster0 slow to reply, should only get DNs from nn1
     MiniDFSCluster dfsCluster = cluster.getCluster();
     NameNode nn0 = dfsCluster.getNameNode(0);
-    simulateNNSlow(nn0);
+    simulateSlowNamenode(nn0, 3);
     waitUpdateLiveNodes(jsonString2, metrics);
     final String jsonString3 = metrics.getLiveNodes();
     assertEquals(2, getNumDatanodes(jsonString3));
 
     // Making subcluster1 slow to reply, shouldn't get any DNs
     NameNode nn1 = dfsCluster.getNameNode(1);
-    simulateNNSlow(nn1);
+    simulateSlowNamenode(nn1, 3);
     waitUpdateLiveNodes(jsonString3, metrics);
     final String jsonString4 = metrics.getLiveNodes();
     assertEquals(0, getNumDatanodes(jsonString4));
@@ -249,36 +237,11 @@ public class TestRouterRPCClientRetries {
   private static void waitUpdateLiveNodes(
       final String oldValue, final NamenodeBeanMetrics metrics)
           throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+    waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
         return !oldValue.equals(metrics.getLiveNodes());
       }
     }, 500, 5 * 1000);
   }
-
-  /**
-   * Simulate that a Namenode is slow by adding a sleep to the check operation
-   * in the NN.
-   * @param nn Namenode to simulate slow.
-   * @throws Exception If we cannot add the sleep time.
-   */
-  private static void simulateNNSlow(final NameNode nn) throws Exception {
-    FSNamesystem namesystem = nn.getNamesystem();
-    HAContext haContext = namesystem.getHAContext();
-    HAContext spyHAContext = spy(haContext);
-    doAnswer(new Answer<Object>() {
-      @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
-        LOG.info("Simulating slow namenode {}", invocation.getMock());
-        try {
-          Thread.sleep(3 * 1000);
-        } catch(InterruptedException e) {
-          LOG.error("Simulating a slow namenode aborted");
-        }
-        return null;
-      }
-    }).when(spyHAContext).checkOperation(any(OperationCategory.class));
-    Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5f13f6d/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
index e5d8348..f16ceb5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -187,7 +188,7 @@ public class TestRouterSafemode {
     try {
       router.getRpcServer().delete("/testfile.txt", true);
       fail("We should have thrown a safe mode exception");
-    } catch (RouterSafeModeException sme) {
+    } catch (StandbyException sme) {
       exception = true;
     }
     assertTrue("We should have thrown a safe mode exception", exception);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to