Repository: hadoop Updated Branches: refs/heads/branch-2 af2fb6a90 -> c12bf9a12
HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee (cherry picked from commit 8e3a992eccff26a7344c3f0e719898fa97706b8c) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c12bf9a1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c12bf9a1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c12bf9a1 Branch: refs/heads/branch-2 Commit: c12bf9a12875e5f5c0c8e4a15f78e4812a0dd268 Parents: af2fb6a Author: Kihwal Lee <kih...@apache.org> Authored: Fri Jul 21 09:19:01 2017 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Fri Jul 21 09:19:01 2017 -0500 ---------------------------------------------------------------------- .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c12bf9a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index fd09474..444984b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -118,6 +118,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -136,11 +137,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; + private int lotSize = 1; Allocator(int max) { this.max = max; } + /** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -152,9 +155,19 @@ public class Dispatcher { } } + /** Aloocate a single lot of items */ + int allocate() { + return allocate(lotSize); + } + synchronized void reset() { count = 0; } + + /** Set the lot size */ + synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; + } } private static class GlobalBlockMap { @@ -921,6 +934,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); + this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1021,7 +1035,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1072,6 +1086,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + + // Determine the size of each mover thread pool per target + int threadsPerTarget = maxMoverThreads/targets.size(); + if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); + } else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { + threadsPerTarget = maxConcurrentMovesPerNode; + LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); + } + long dSec = 0; final Iterator<Source> i = sources.iterator(); for (int j = 0; j < futures.length; j++) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org