[03/50] [abbrv] hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee
HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e Branch: refs/heads/HDFS-10467 Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c Parents: 3b48f81 Author: Kihwal LeeAuthored: Fri Jul 21 09:14:19 2017 -0500 Committer: Kihwal Lee Committed: Fri Jul 21 09:14:19 2017 -0500 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++- 1 file changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index f855e45..9270fde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -121,6 +122,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -139,11 +141,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; +private int lotSize = 1; Allocator(int max) { this.max = max; } +/** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -155,9 +159,19 @@ public class Dispatcher { } } +/** Aloocate a single lot of items */ +int allocate() { + return allocate(lotSize); +} + synchronized void reset() { count = 0; } + +/** Set the lot size */ +synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; +} } private static class GlobalBlockMap { @@ -1017,6 +1031,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); +this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1116,7 +1131,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1166,6 +1181,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + +// Determine the size of each mover thread pool per target +int threadsPerTarget = maxMoverThreads/targets.size(); +if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); +} else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { +threadsPerTarget = maxConcurrentMovesPerNode; +LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); +} + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length;
[07/34] hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee
HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e Branch: refs/heads/HDFS-7240 Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c Parents: 3b48f81 Author: Kihwal LeeAuthored: Fri Jul 21 09:14:19 2017 -0500 Committer: Kihwal Lee Committed: Fri Jul 21 09:14:19 2017 -0500 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++- 1 file changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index f855e45..9270fde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -121,6 +122,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -139,11 +141,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; +private int lotSize = 1; Allocator(int max) { this.max = max; } +/** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -155,9 +159,19 @@ public class Dispatcher { } } +/** Aloocate a single lot of items */ +int allocate() { + return allocate(lotSize); +} + synchronized void reset() { count = 0; } + +/** Set the lot size */ +synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; +} } private static class GlobalBlockMap { @@ -1017,6 +1031,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); +this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1116,7 +1131,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1166,6 +1181,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + +// Determine the size of each mover thread pool per target +int threadsPerTarget = maxMoverThreads/targets.size(); +if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); +} else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { +threadsPerTarget = maxConcurrentMovesPerNode; +LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); +} + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++)
hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee Updated CHANGES.txt (cherry picked from commit e229ffee64e8abd1df4d819b81151582ed3a16e2)
Repository: hadoop Updated Branches: refs/heads/branch-2.7 1865cc5bd -> 62ce8f043 HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee Updated CHANGES.txt (cherry picked from commit e229ffee64e8abd1df4d819b81151582ed3a16e2) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/62ce8f04 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/62ce8f04 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/62ce8f04 Branch: refs/heads/branch-2.7 Commit: 62ce8f043065dfd6c98945cd2ce92b6825b16bef Parents: 1865cc5 Author: Kihwal LeeAuthored: Fri Jul 21 10:09:32 2017 -0500 Committer: Kihwal Lee Committed: Fri Jul 21 10:09:56 2017 -0500 -- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++ .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++- 2 files changed, 36 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ce8f04/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8b1bf44..5713513 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -394,6 +394,8 @@ Release 2.7.4 - UNRELEASED HDFS-12177. NameNode exits due to setting BlockPlacementPolicy loglevel to Debug. (Jiandan Yang via Brahma Reddy Battula) +HDFS-11742. Improve balancer usability after HDFS-8818. (kihwal) + Release 2.7.3 - 2016-08-25 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ce8f04/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 75c32f9..b3e591e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -118,6 +118,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -131,11 +132,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; +private int lotSize = 1; Allocator(int max) { this.max = max; } +/** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -147,9 +150,19 @@ public class Dispatcher { } } +/** Aloocate a single lot of items */ +int allocate() { + return allocate(lotSize); +} + synchronized void reset() { count = 0; } + +/** Set the lot size */ +synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; +} } private static class GlobalBlockMap { @@ -905,6 +918,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); +this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -999,7 +1013,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1050,6 +1064,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + +// Determine the size of each mover thread pool per target +int threadsPerTarget = maxMoverThreads/targets.size(); +if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for
hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee
Repository: hadoop Updated Branches: refs/heads/branch-2.8 f65dc6ee9 -> e229ffee6 HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee (cherry picked from commit 8e3a992eccff26a7344c3f0e719898fa97706b8c) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e229ffee Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e229ffee Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e229ffee Branch: refs/heads/branch-2.8 Commit: e229ffee64e8abd1df4d819b81151582ed3a16e2 Parents: f65dc6e Author: Kihwal LeeAuthored: Fri Jul 21 09:22:28 2017 -0500 Committer: Kihwal Lee Committed: Fri Jul 21 09:22:28 2017 -0500 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++- 1 file changed, 34 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e229ffee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 36aef8e..33c6398 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -118,6 +118,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -136,11 +137,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; +private int lotSize = 1; Allocator(int max) { this.max = max; } +/** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -152,9 +155,19 @@ public class Dispatcher { } } +/** Aloocate a single lot of items */ +int allocate() { + return allocate(lotSize); +} + synchronized void reset() { count = 0; } + +/** Set the lot size */ +synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; +} } private static class GlobalBlockMap { @@ -921,6 +934,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); +this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1024,7 +1038,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1075,6 +1089,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + +// Determine the size of each mover thread pool per target +int threadsPerTarget = maxMoverThreads/targets.size(); +if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); +} else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { +threadsPerTarget = maxConcurrentMovesPerNode; +LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); +} + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { - To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org
hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee
Repository: hadoop Updated Branches: refs/heads/branch-2 af2fb6a90 -> c12bf9a12 HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee (cherry picked from commit 8e3a992eccff26a7344c3f0e719898fa97706b8c) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c12bf9a1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c12bf9a1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c12bf9a1 Branch: refs/heads/branch-2 Commit: c12bf9a12875e5f5c0c8e4a15f78e4812a0dd268 Parents: af2fb6a Author: Kihwal LeeAuthored: Fri Jul 21 09:19:01 2017 -0500 Committer: Kihwal Lee Committed: Fri Jul 21 09:19:01 2017 -0500 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++- 1 file changed, 34 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c12bf9a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index fd09474..444984b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -118,6 +118,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -136,11 +137,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; +private int lotSize = 1; Allocator(int max) { this.max = max; } +/** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -152,9 +155,19 @@ public class Dispatcher { } } +/** Aloocate a single lot of items */ +int allocate() { + return allocate(lotSize); +} + synchronized void reset() { count = 0; } + +/** Set the lot size */ +synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; +} } private static class GlobalBlockMap { @@ -921,6 +934,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); +this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1021,7 +1035,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1072,6 +1086,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + +// Determine the size of each mover thread pool per target +int threadsPerTarget = maxMoverThreads/targets.size(); +if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); +} else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { +threadsPerTarget = maxConcurrentMovesPerNode; +LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); +} + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { - To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org
hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee
Repository: hadoop Updated Branches: refs/heads/trunk 3b48f8141 -> 8e3a992ec HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e Branch: refs/heads/trunk Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c Parents: 3b48f81 Author: Kihwal LeeAuthored: Fri Jul 21 09:14:19 2017 -0500 Committer: Kihwal Lee Committed: Fri Jul 21 09:14:19 2017 -0500 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++- 1 file changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index f855e45..9270fde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -121,6 +122,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -139,11 +141,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; +private int lotSize = 1; Allocator(int max) { this.max = max; } +/** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -155,9 +159,19 @@ public class Dispatcher { } } +/** Aloocate a single lot of items */ +int allocate() { + return allocate(lotSize); +} + synchronized void reset() { count = 0; } + +/** Set the lot size */ +synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; +} } private static class GlobalBlockMap { @@ -1017,6 +1031,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); +this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1116,7 +1131,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1166,6 +1181,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + +// Determine the size of each mover thread pool per target +int threadsPerTarget = maxMoverThreads/targets.size(); +if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); +} else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { +threadsPerTarget = maxConcurrentMovesPerNode; +LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); +} + long dSec = 0; final