Repository: hadoop Updated Branches: refs/heads/trunk 09ef97dcc -> f6367c5f4
HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6367c5f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6367c5f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6367c5f Branch: refs/heads/trunk Commit: f6367c5f44a88cb5eb7edffb015b10b657504a61 Parents: 09ef97d Author: Zhe Zhang <z...@apache.org> Authored: Tue Oct 25 10:18:57 2016 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Tue Oct 25 10:19:13 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hadoop/hdfs/server/balancer/Balancer.java | 5 +- .../hadoop/hdfs/server/balancer/Dispatcher.java | 49 +++++++++++++++----- .../src/main/resources/hdfs-default.xml | 15 ++++++ 4 files changed, 58 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d54c109..951ad68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -496,6 +496,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0"; public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file"; public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal"; + public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout"; + public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 2037d01..583ade3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -282,13 +282,16 @@ public class Balancer { final long getBlocksMinBlockSize = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); + final int blockMoveTimeout = conf.getInt( + DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT, + DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT); this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.getIncludedNodes(), p.getExcludedNodes(), movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, - getBlocksMinBlockSize, conf); + getBlocksMinBlockSize, blockMoveTimeout, conf); this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index e5c5e53..e090174 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -121,6 +121,7 @@ public class Dispatcher { private final long getBlocksSize; private final long getBlocksMinBlockSize; + private final long blockMoveTimeout; private final int ioFileBufferSize; @@ -331,6 +332,11 @@ public class Dispatcher { getXferAddr(Dispatcher.this.connectToDnViaHostname)), HdfsConstants.READ_TIMEOUT); + // Set read timeout so that it doesn't hang forever against + // unresponsive nodes. Datanode normally sends IN_PROGRESS response + // twice within the client read timeout period (every 30 seconds by + // default). Here, we make it give up after 5 minutes of no response. + sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); @@ -386,13 +392,26 @@ public class Dispatcher { source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } + /** Check whether to continue waiting for response */ + private boolean stopWaitingForResponse(long startTime) { + return source.isIterationOver() || + (blockMoveTimeout > 0 && + (Time.monotonicNow() - startTime > blockMoveTimeout)); + } + /** Receive a reportedBlock copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { + long startTime = Time.monotonicNow(); BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); while (response.getStatus() == Status.IN_PROGRESS) { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + // Stop waiting for slow block moves. Even if it stops waiting, + // the actual move may continue. + if (stopWaitingForResponse(startTime)) { + throw new IOException("Block move timed out"); + } } String logInfo = "reportedBlock move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); @@ -671,6 +690,7 @@ public class Dispatcher { private final List<Task> tasks = new ArrayList<Task>(2); private long blocksToReceive = 0L; + private final long startTime = Time.monotonicNow(); /** * Source blocks point to the objects in {@link Dispatcher#globalBlocks} * because we want to keep one copy of a block and be aware that the @@ -682,6 +702,13 @@ public class Dispatcher { dn.super(storageType, maxSize2Move); } + /** + * Check if the iteration is over + */ + public boolean isIterationOver() { + return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME); + } + /** Add a task */ void addTask(Task task) { Preconditions.checkState(task.target != this, @@ -838,24 +865,15 @@ public class Dispatcher { * elapsed time of the iteration has exceeded the max time limit. */ private void dispatchBlocks() { - final long startTime = Time.monotonicNow(); this.blocksToReceive = 2 * getScheduledSize(); - boolean isTimeUp = false; int noPendingMoveIteration = 0; - while (!isTimeUp && getScheduledSize() > 0 + while (getScheduledSize() > 0 && !isIterationOver() && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { if (LOG.isTraceEnabled()) { LOG.trace(this + " blocksToReceive=" + blocksToReceive + ", scheduledSize=" + getScheduledSize() + ", srcBlocks#=" + srcBlocks.size()); } - // check if time is up or not - if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { - LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000 - + " seconds). Skipping " + this); - isTimeUp = true; - continue; - } final PendingMove p = chooseNextMove(); if (p != null) { // Reset no pending move counter @@ -902,6 +920,11 @@ public class Dispatcher { } catch (InterruptedException ignored) { } } + + if (isIterationOver()) { + LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000 + + " seconds) has been reached. Stopping " + this); + } } @Override @@ -921,13 +944,14 @@ public class Dispatcher { int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, - 0L, 0L, conf); + 0L, 0L, 0, conf); } Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, - long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { + long getBlocksSize, long getBlocksMinBlockSize, + int blockMoveTimeout, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -942,6 +966,7 @@ public class Dispatcher { this.getBlocksSize = getBlocksSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize; + this.blockMoveTimeout = blockMoveTimeout; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 483663e..61a7063 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3228,6 +3228,21 @@ </property> <property> + <name>dfs.balancer.block-move.timeout</name> + <value>0</value> + <description> + Maximum amount of time in milliseconds for a block to move. If this is set + greater than 0, Balancer will stop waiting for a block move completion + after this time. In typical clusters, a 3 to 5 minute timeout is reasonable. + If timeout happens to a large proportion of block moves, this needs to be + increased. It could also be that too much work is dispatched and many nodes + are constantly exceeding the bandwidth limit as a result. In that case, + other balancer parameters might need to be adjusted. + It is disabled (0) by default. + </description> +</property> + +<property> <name>dfs.block.invalidate.limit</name> <value>1000</value> <description> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org