Repository: hadoop
Updated Branches:
  refs/heads/trunk 09ef97dcc -> f6367c5f4


HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6367c5f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6367c5f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6367c5f

Branch: refs/heads/trunk
Commit: f6367c5f44a88cb5eb7edffb015b10b657504a61
Parents: 09ef97d
Author: Zhe Zhang <z...@apache.org>
Authored: Tue Oct 25 10:18:57 2016 -0700
Committer: Zhe Zhang <z...@apache.org>
Committed: Tue Oct 25 10:19:13 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  2 +
 .../hadoop/hdfs/server/balancer/Balancer.java   |  5 +-
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 49 +++++++++++++++-----
 .../src/main/resources/hdfs-default.xml         | 15 ++++++
 4 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d54c109..951ad68 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -496,6 +496,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0";
   public static final String  DFS_BALANCER_KEYTAB_FILE_KEY = 
"dfs.balancer.keytab.file";
   public static final String  DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = 
"dfs.balancer.kerberos.principal";
+  public static final String  DFS_BALANCER_BLOCK_MOVE_TIMEOUT = 
"dfs.balancer.block-move.timeout";
+  public static final int     DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
 
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = 
"dfs.mover.movedWinWidth";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 2037d01..583ade3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -282,13 +282,16 @@ public class Balancer {
     final long getBlocksMinBlockSize = getLongBytes(conf,
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
+    final int blockMoveTimeout = conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
+        DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
 
     this.nnc = theblockpool;
     this.dispatcher =
         new Dispatcher(theblockpool, p.getIncludedNodes(),
             p.getExcludedNodes(), movedWinWidth, moverThreads,
             dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
-            getBlocksMinBlockSize, conf);
+            getBlocksMinBlockSize, blockMoveTimeout, conf);
     this.threshold = p.getThreshold();
     this.policy = p.getBalancingPolicy();
     this.sourceNodes = p.getSourceNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index e5c5e53..e090174 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -121,6 +121,7 @@ public class Dispatcher {
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
+  private final long blockMoveTimeout;
 
   private final int ioFileBufferSize;
 
@@ -331,6 +332,11 @@ public class Dispatcher {
                 getXferAddr(Dispatcher.this.connectToDnViaHostname)),
                 HdfsConstants.READ_TIMEOUT);
 
+        // Set read timeout so that it doesn't hang forever against
+        // unresponsive nodes. Datanode normally sends IN_PROGRESS response
+        // twice within the client read timeout period (every 30 seconds by
+        // default). Here, we make it give up after 5 minutes of no response.
+        sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
         sock.setKeepAlive(true);
 
         OutputStream unbufOut = sock.getOutputStream();
@@ -386,13 +392,26 @@ public class Dispatcher {
           source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
     }
 
+    /** Check whether to continue waiting for response */
+    private boolean stopWaitingForResponse(long startTime) {
+      return source.isIterationOver() ||
+          (blockMoveTimeout > 0 &&
+          (Time.monotonicNow() - startTime > blockMoveTimeout));
+    }
+
     /** Receive a reportedBlock copy response from the input stream */
     private void receiveResponse(DataInputStream in) throws IOException {
+      long startTime = Time.monotonicNow();
       BlockOpResponseProto response =
           BlockOpResponseProto.parseFrom(vintPrefixed(in));
       while (response.getStatus() == Status.IN_PROGRESS) {
         // read intermediate responses
         response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+        // Stop waiting for slow block moves. Even if it stops waiting,
+        // the actual move may continue.
+        if (stopWaitingForResponse(startTime)) {
+          throw new IOException("Block move timed out");
+        }
       }
       String logInfo = "reportedBlock move is failed";
       DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
@@ -671,6 +690,7 @@ public class Dispatcher {
 
     private final List<Task> tasks = new ArrayList<Task>(2);
     private long blocksToReceive = 0L;
+    private final long startTime = Time.monotonicNow();
     /**
      * Source blocks point to the objects in {@link Dispatcher#globalBlocks}
      * because we want to keep one copy of a block and be aware that the
@@ -682,6 +702,13 @@ public class Dispatcher {
       dn.super(storageType, maxSize2Move);
     }
 
+    /**
+     * Check if the iteration is over
+     */
+    public boolean isIterationOver() {
+      return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
+    }
+
     /** Add a task */
     void addTask(Task task) {
       Preconditions.checkState(task.target != this,
@@ -838,24 +865,15 @@ public class Dispatcher {
      * elapsed time of the iteration has exceeded the max time limit.
      */
     private void dispatchBlocks() {
-      final long startTime = Time.monotonicNow();
       this.blocksToReceive = 2 * getScheduledSize();
-      boolean isTimeUp = false;
       int noPendingMoveIteration = 0;
-      while (!isTimeUp && getScheduledSize() > 0
+      while (getScheduledSize() > 0 && !isIterationOver()
           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(this + " blocksToReceive=" + blocksToReceive
               + ", scheduledSize=" + getScheduledSize()
               + ", srcBlocks#=" + srcBlocks.size());
         }
-        // check if time is up or not
-        if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
-          LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
-              + " seconds).  Skipping " + this);
-          isTimeUp = true;
-          continue;
-        }
         final PendingMove p = chooseNextMove();
         if (p != null) {
           // Reset no pending move counter
@@ -902,6 +920,11 @@ public class Dispatcher {
         } catch (InterruptedException ignored) {
         }
       }
+
+      if (isIterationOver()) {
+        LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
+            + " seconds) has been reached. Stopping " + this);
+      }
     }
 
     @Override
@@ -921,13 +944,14 @@ public class Dispatcher {
       int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration 
conf) {
     this(nnc, includedNodes, excludedNodes, movedWinWidth,
         moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
-        0L, 0L, conf);
+        0L, 0L, 0,  conf);
   }
 
   Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode,
-      long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
+      long getBlocksSize, long getBlocksMinBlockSize,
+      int blockMoveTimeout, Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -942,6 +966,7 @@ public class Dispatcher {
 
     this.getBlocksSize = getBlocksSize;
     this.getBlocksMinBlockSize = getBlocksMinBlockSize;
+    this.blockMoveTimeout = blockMoveTimeout;
 
     this.saslClient = new SaslDataTransferClient(conf,
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 483663e..61a7063 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3228,6 +3228,21 @@
 </property>
 
 <property>
+  <name>dfs.balancer.block-move.timeout</name>
+  <value>0</value>
+  <description>
+    Maximum amount of time in milliseconds for a block to move. If this is set
+    greater than 0, Balancer will stop waiting for a block move completion
+    after this time. In typical clusters, a 3 to 5 minute timeout is 
reasonable.
+    If timeout happens to a large proportion of block moves, this needs to be
+    increased. It could also be that too much work is dispatched and many nodes
+    are constantly exceeding the bandwidth limit as a result. In that case,
+    other balancer parameters might need to be adjusted.
+    It is disabled (0) by default.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.invalidate.limit</name>
   <value>1000</value>
   <description>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to