HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source 
DataNode. Contributed by  Mark Wagner and Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/49a09179
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/49a09179
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/49a09179

Branch: refs/heads/YARN-2915
Commit: 49a09179e3fadae090126261be0a7fe0aa48798e
Parents: f922067
Author: Kihwal Lee <kih...@apache.org>
Authored: Mon Nov 21 10:13:24 2016 -0600
Committer: Kihwal Lee <kih...@apache.org>
Committed: Mon Nov 21 10:13:24 2016 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 +++
 .../hadoop/hdfs/server/balancer/Balancer.java   |  5 ++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 34 ++++++++++++--------
 .../apache/hadoop/hdfs/server/mover/Mover.java  |  5 ++-
 .../src/main/resources/hdfs-default.xml         | 20 ++++++++++++
 .../hdfs/server/balancer/TestBalancer.java      |  2 ++
 6 files changed, 55 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/49a09179/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b1fb3f4..b9fd939 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -502,6 +502,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = 
"dfs.balancer.kerberos.principal";
   public static final String  DFS_BALANCER_BLOCK_MOVE_TIMEOUT = 
"dfs.balancer.block-move.timeout";
   public static final int     DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
+  public static final String  DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = 
"dfs.balancer.max-no-move-interval";
+  public static final int    DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 
60*1000; // One minute
 
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = 
"dfs.mover.movedWinWidth";
@@ -519,6 +521,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.mover.keytab.file";
   public static final String  DFS_MOVER_KERBEROS_PRINCIPAL_KEY =
       "dfs.mover.kerberos.principal";
+  public static final String  DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = 
"dfs.mover.max-no-move-interval";
+  public static final int    DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; 
// One minute
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = 
"dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 9866;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49a09179/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 583ade3..61352f3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -285,13 +285,16 @@ public class Balancer {
     final int blockMoveTimeout = conf.getInt(
         DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
         DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
+    final int maxNoMoveInterval = conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
 
     this.nnc = theblockpool;
     this.dispatcher =
         new Dispatcher(theblockpool, p.getIncludedNodes(),
             p.getExcludedNodes(), movedWinWidth, moverThreads,
             dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
-            getBlocksMinBlockSize, blockMoveTimeout, conf);
+            getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
     this.threshold = p.getThreshold();
     this.policy = p.getBalancingPolicy();
     this.sourceNodes = p.getSourceNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49a09179/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index e090174..eb3ed87 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -122,6 +122,11 @@ public class Dispatcher {
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
   private final long blockMoveTimeout;
+  /**
+   * If no block can be moved out of a {@link Source} after this configured
+   * amount of time, the Source should give up choosing the next possible move.
+   */
+  private final int maxNoMoveInterval;
 
   private final int ioFileBufferSize;
 
@@ -866,7 +871,7 @@ public class Dispatcher {
      */
     private void dispatchBlocks() {
       this.blocksToReceive = 2 * getScheduledSize();
-      int noPendingMoveIteration = 0;
+      long previousMoveTimestamp = Time.monotonicNow();
       while (getScheduledSize() > 0 && !isIterationOver()
           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
         if (LOG.isTraceEnabled()) {
@@ -876,8 +881,8 @@ public class Dispatcher {
         }
         final PendingMove p = chooseNextMove();
         if (p != null) {
-          // Reset no pending move counter
-          noPendingMoveIteration=0;
+          // Reset previous move timestamp
+          previousMoveTimestamp = Time.monotonicNow();
           executePendingMove(p);
           continue;
         }
@@ -900,13 +905,11 @@ public class Dispatcher {
             return;
           }
         } else {
-          // source node cannot find a pending block to move, iteration +1
-          noPendingMoveIteration++;
-          // in case no blocks can be moved for source node's task,
-          // jump out of while-loop after 5 iterations.
-          if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
-            LOG.info("Failed to find a pending move "  + noPendingMoveIteration
-                + " times.  Skipping " + this);
+          // jump out of while-loop after the configured timeout.
+          long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp;
+          if (noMoveInterval > maxNoMoveInterval) {
+            LOG.info("Failed to find a pending move for "  + noMoveInterval
+                + " ms.  Skipping " + this);
             resetScheduledSize();
           }
         }
@@ -917,6 +920,9 @@ public class Dispatcher {
           synchronized (Dispatcher.this) {
             Dispatcher.this.wait(1000); // wait for targets/sources to be idle
           }
+          // Didn't find a possible move in this iteration of the while loop,
+          // adding a small delay before choosing next move again.
+          Thread.sleep(100);
         } catch (InterruptedException ignored) {
         }
       }
@@ -941,17 +947,18 @@ public class Dispatcher {
   /** Constructor called by Mover. */
   public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
-      int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration 
conf) {
+      int dispatcherThreads, int maxConcurrentMovesPerNode,
+      int maxNoMoveInterval, Configuration conf) {
     this(nnc, includedNodes, excludedNodes, movedWinWidth,
         moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
-        0L, 0L, 0,  conf);
+        0L, 0L, 0, maxNoMoveInterval, conf);
   }
 
   Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode,
       long getBlocksSize, long getBlocksMinBlockSize,
-      int blockMoveTimeout, Configuration conf) {
+      int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -967,6 +974,7 @@ public class Dispatcher {
     this.getBlocksSize = getBlocksSize;
     this.getBlocksMinBlockSize = getBlocksMinBlockSize;
     this.blockMoveTimeout = blockMoveTimeout;
+    this.maxNoMoveInterval = maxNoMoveInterval;
 
     this.saslClient = new SaslDataTransferClient(conf,
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49a09179/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 0c85374..4fbae76 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -130,13 +130,16 @@ public class Mover {
     final int maxConcurrentMovesPerNode = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    final int maxNoMoveInterval = conf.getInt(
+        DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT);
     this.retryMaxAttempts = conf.getInt(
         DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
         DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
     this.retryCount = retryCount;
     this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
         Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
-        maxConcurrentMovesPerNode, conf);
+        maxConcurrentMovesPerNode, maxNoMoveInterval, conf);
     this.storages = new StorageMap();
     this.targetPaths = nnc.getTargetPaths();
     this.blockStoragePolicies = new BlockStoragePolicy[1 <<

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49a09179/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 5d6376a..acb24fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3252,6 +3252,16 @@
 </property>
 
 <property>
+  <name>dfs.balancer.max-no-move-interval</name>
+  <value>60000</value>
+  <description>
+    If this specified amount of time has elapsed and no block has been moved
+    out of a source DataNode, on more effort will be made to move blocks out of
+    this DataNode in the current Balancer iteration.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.invalidate.limit</name>
   <value>1000</value>
   <description>
@@ -3844,6 +3854,16 @@
 </property>
 
 <property>
+  <name>dfs.mover.max-no-move-interval</name>
+  <value>60000</value>
+  <description>
+    If this specified amount of time has elapsed and no block has been moved
+    out of a source DataNode, on more effort will be made to move blocks out of
+    this DataNode in the current Mover iteration.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.audit.log.async</name>
   <value>false</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49a09179/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index b63aa4f..730f655 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -184,6 +184,7 @@ public class TestBalancer {
 
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
   }
 
   static void initConfWithRamDisk(Configuration conf,
@@ -194,6 +195,7 @@ public class TestBalancer {
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
     LazyPersistTestCase.initCacheManipulator();
 
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to