Repository: hadoop
Updated Branches:
  refs/heads/trunk 8b5f2c372 -> 28eb2aabe


HDFS-11384. Balancer disperses getBlocks calls to avoid NameNode's rpc queue 
saturation. Contributed by Konstantin V Shvachko.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28eb2aab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28eb2aab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28eb2aab

Branch: refs/heads/trunk
Commit: 28eb2aabebd15c15a357d86e23ca407d3c85211c
Parents: 8b5f2c3
Author: Konstantin V Shvachko <s...@apache.org>
Authored: Wed Apr 26 17:28:49 2017 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Wed Apr 26 17:28:49 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 41 +++++++-
 .../hdfs/server/balancer/TestBalancer.java      | 98 ++++++++++++++++++--
 .../server/balancer/TestBalancerRPCDelay.java   | 32 +++++++
 .../blockmanagement/BlockManagerTestUtil.java   |  5 +
 .../hdfs/server/namenode/NameNodeAdapter.java   | 31 ++++++-
 5 files changed, 195 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/28eb2aab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index dc81901..91dc907 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -901,8 +902,11 @@ public class Dispatcher {
      * namenode for more blocks. It terminates when it has dispatch enough 
block
      * move tasks or it has received enough blocks from the namenode, or the
      * elapsed time of the iteration has exceeded the max time limit.
+     *
+     * @param delay - time to sleep before sending getBlocks. Intended to
+     * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
      */
-    private void dispatchBlocks() {
+    private void dispatchBlocks(long delay) {
       this.blocksToReceive = 2 * getScheduledSize();
       long previousMoveTimestamp = Time.monotonicNow();
       while (getScheduledSize() > 0 && !isIterationOver()
@@ -927,15 +931,25 @@ public class Dispatcher {
         if (shouldFetchMoreBlocks()) {
           // fetch new blocks
           try {
+            if(delay > 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sleeping " + delay + "  msec.");
+              }
+              Thread.sleep(delay);
+            }
             final long received = getBlockList();
             if (received == 0) {
               return;
             }
             blocksToReceive -= received;
             continue;
+          } catch (InterruptedException ignored) {
+            // nothing to do
           } catch (IOException e) {
             LOG.warn("Exception while getting reportedBlock list", e);
             return;
+          } finally {
+            delay = 0L;
           }
         } else {
           // jump out of while-loop after the configured timeout.
@@ -1125,6 +1139,12 @@ public class Dispatcher {
   }
 
   /**
+   * The best-effort limit on the number of RPCs per second
+   * the Balancer will send to the NameNode.
+   */
+  final static int BALANCER_NUM_RPC_PER_SEC = 20;
+
+  /**
    * Dispatch block moves for each source. The thread selects blocks to move &
    * sends request to proxy source to initiate block move. The process is flow
    * controlled. Block selection is blocked if there are too many un-confirmed
@@ -1136,15 +1156,32 @@ public class Dispatcher {
     final long bytesLastMoved = getBytesMoved();
     final Future<?>[] futures = new Future<?>[sources.size()];
 
+    int concurrentThreads = Math.min(sources.size(),
+        ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
+    assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
+      LOG.debug("Balancer concurrent threads = " + concurrentThreads);
+      LOG.debug("Disperse Interval sec = " +
+          concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
+    }
+    long dSec = 0;
     final Iterator<Source> i = sources.iterator();
     for (int j = 0; j < futures.length; j++) {
       final Source s = i.next();
+      final long delay = dSec * 1000;
       futures[j] = dispatchExecutor.submit(new Runnable() {
         @Override
         public void run() {
-          s.dispatchBlocks();
+          s.dispatchBlocks(delay);
         }
       });
+      // Calculate delay in seconds for the next iteration
+      if(j >= concurrentThreads) {
+        dSec = 0;
+      } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
+        dSec++;
+      }
     }
 
     // wait for all dispatcher threads to finish

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28eb2aab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 30a3a32..e177da3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -50,6 +50,9 @@ import org.junit.AfterClass;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
 
 import java.io.File;
 import java.io.IOException;
@@ -105,9 +108,14 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.minikdc.MiniKdc;
@@ -121,6 +129,8 @@ import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * This class tests if a balancer schedules tasks correctly.
@@ -130,6 +140,7 @@ public class TestBalancer {
 
   static {
     GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG);
   }
 
   final static long CAPACITY = 5000L;
@@ -776,6 +787,13 @@ public class TestBalancer {
     doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, 
false);
   }
 
+  private void doTest(Configuration conf, long[] capacities, String[] racks,
+      long newCapacity, String newRack, NewNodeInfo nodes,
+      boolean useTool, boolean useFile) throws Exception {
+    doTest(conf, capacities, racks, newCapacity, newRack, nodes,
+        useTool, useFile, false);
+  }
+
   /** This test start a cluster with specified number of nodes,
    * and fills it to be 30% full (with a single file replicated identically
    * to all datanodes);
@@ -791,11 +809,13 @@ public class TestBalancer {
    *   parsing, etc.   Otherwise invoke balancer API directly.
    * @param useFile - if true, the hosts to included or excluded will be 
stored in a
    *   file and then later read from the file.
+   * @param useNamesystemSpy - spy on FSNamesystem if true
    * @throws Exception
    */
   private void doTest(Configuration conf, long[] capacities,
       String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
-      boolean useTool, boolean useFile) throws Exception {
+      boolean useTool, boolean useFile,
+      boolean useNamesystemSpy) throws Exception {
     LOG.info("capacities = " +  long2String(capacities));
     LOG.info("racks      = " +  Arrays.asList(racks));
     LOG.info("newCapacity= " +  newCapacity);
@@ -803,15 +823,25 @@ public class TestBalancer {
     LOG.info("useTool    = " +  useTool);
     assertEquals(capacities.length, racks.length);
     int numOfDatanodes = capacities.length;
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(capacities.length)
-                                .racks(racks)
-                                .simulatedCapacities(capacities)
-                                .build();
+
     try {
+      cluster = new MiniDFSCluster.Builder(conf)
+                                  .numDataNodes(0)
+                                  .build();
+      cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      if(useNamesystemSpy) {
+        LOG.info("Using Spy Namesystem");
+        spyFSNamesystem(cluster.getNameNode());
+      }
+      cluster.startDataNodes(conf, numOfDatanodes, true,
+          StartupOption.REGULAR, racks, null, capacities, false);
+      cluster.waitClusterUp();
       cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf, 
cluster.getFileSystem(0).getUri(),
-          ClientProtocol.class).getProxy();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
 
       long totalCapacity = sum(capacities);
 
@@ -891,7 +921,9 @@ public class TestBalancer {
         runBalancer(conf, totalUsedSpace, totalCapacity, p, 
expectedExcludedNodes);
       }
     } finally {
-      cluster.shutdown();
+      if(cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
@@ -2004,6 +2036,54 @@ public class TestBalancer {
     }
   }
 
+  private static int numGetBlocksCalls;
+  private static long startGetBlocksTime, endGetBlocksTime;
+
+  private void spyFSNamesystem(NameNode nn) throws IOException {
+    FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
+    numGetBlocksCalls = 0;
+    endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
+    doAnswer(new Answer<BlocksWithLocations>() {
+      @Override
+      public BlocksWithLocations answer(InvocationOnMock invocation)
+          throws Throwable {
+        BlocksWithLocations blk =
+            (BlocksWithLocations)invocation.callRealMethod();
+        endGetBlocksTime = Time.monotonicNow();
+        numGetBlocksCalls++;
+        return blk;
+      }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong());
+  }
+
+  /**
+   * Test that makes the Balancer to disperse RPCs to the NameNode
+   * in order to avoid NN's RPC queue saturation.
+   */
+  void testBalancerRPCDelay() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
+
+    int numDNs = 40;
+    long[] capacities = new long[numDNs];
+    String[] racks = new String[numDNs];
+    for(int i = 0; i < numDNs; i++) {
+      capacities[i] = CAPACITY;
+      racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
+    }
+    doTest(conf, capacities, racks, CAPACITY, RACK2,
+        new PortNumberBasedNodes(3, 0, 0), false, false, true);
+    assertTrue("Number of getBlocks should be not less than " +
+        Dispatcher.BALANCER_NUM_RPC_PER_SEC,
+        numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
+    long d = 1 + endGetBlocksTime - startGetBlocksTime;
+    LOG.info("Balancer executed " + numGetBlocksCalls
+        + " getBlocks in " + d + " msec.");
+    assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
+        Dispatcher.BALANCER_NUM_RPC_PER_SEC,
+        (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
+  }
+
   /**
    * @param args
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28eb2aab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
new file mode 100644
index 0000000..960ad25
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.junit.Test;
+
+/**
+ * The Balancer ensures that it disperses RPCs to the NameNode
+ * in order to avoid NN's RPC queue saturation.
+ */
+public class TestBalancerRPCDelay {
+
+  @Test(timeout=100000)
+  public void testBalancerRPCDelay() throws Exception {
+    new TestBalancer().testBalancerRPCDelay();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28eb2aab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index ee4e2b0..77e2ffb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -142,6 +142,11 @@ public class BlockManagerTestUtil {
     }
   }
 
+  public static HeartbeatManager getHeartbeatManager(
+      final BlockManager blockManager) {
+    return blockManager.getDatanodeManager().getHeartbeatManager();
+  }
+
   /**
    * @return corruptReplicas from block manager
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/28eb2aab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 33af59e..242e8f5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -189,7 +190,35 @@ public class NameNodeAdapter {
   public static long[] getStats(final FSNamesystem fsn) {
     return fsn.getStats();
   }
-  
+
+  public static FSNamesystem spyOnNamesystem(NameNode nn) {
+    FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
+    FSNamesystem fsnOld = nn.namesystem;
+    fsnOld.writeLock();
+    fsnSpy.writeLock();
+    nn.namesystem = fsnSpy;
+    try {
+      FieldUtils.writeDeclaredField(
+          (NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          fsnSpy.getBlockManager().getDatanodeManager(),
+          "namesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
+          "namesystem", fsnSpy, true);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Cannot set spy FSNamesystem", e);
+    } finally {
+      fsnSpy.writeUnlock();
+      fsnOld.writeUnlock();
+    }
+    return fsnSpy;
+  }
+
   public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
     ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests());
     fsn.setFsLockForTests(spy);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to