[03/50] [abbrv] hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

2017-07-28 Thread inigoiri
HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal 
Lee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e

Branch: refs/heads/HDFS-10467
Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c
Parents: 3b48f81
Author: Kihwal Lee 
Authored: Fri Jul 21 09:14:19 2017 -0500
Committer: Kihwal Lee 
Committed: Fri Jul 21 09:14:19 2017 -0500

--
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++-
 1 file changed, 35 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index f855e45..9270fde 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -121,6 +122,7 @@ public class Dispatcher {
 
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
+  private final int maxMoverThreads;
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
@@ -139,11 +141,13 @@ public class Dispatcher {
   static class Allocator {
 private final int max;
 private int count = 0;
+private int lotSize = 1;
 
 Allocator(int max) {
   this.max = max;
 }
 
+/** Allocate specified number of items */
 synchronized int allocate(int n) {
   final int remaining = max - count;
   if (remaining <= 0) {
@@ -155,9 +159,19 @@ public class Dispatcher {
   }
 }
 
+/** Aloocate a single lot of items */
+int allocate() {
+  return allocate(lotSize);
+}
+
 synchronized void reset() {
   count = 0;
 }
+
+/** Set the lot size */
+synchronized void setLotSize(int lotSize) {
+  this.lotSize = lotSize;
+}
   }
 
   private static class GlobalBlockMap {
@@ -1017,6 +1031,7 @@ public class Dispatcher {
 this.dispatchExecutor = dispatcherThreads == 0? null
 : Executors.newFixedThreadPool(dispatcherThreads);
 this.moverThreadAllocator = new Allocator(moverThreads);
+this.maxMoverThreads = moverThreads;
 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
 this.getBlocksSize = getBlocksSize;
@@ -1116,7 +1131,7 @@ public class Dispatcher {
 final DDatanode targetDn = p.target.getDDatanode();
 ExecutorService moveExecutor = targetDn.getMoveExecutor();
 if (moveExecutor == null) {
-  final int nThreads = 
moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+  final int nThreads = moverThreadAllocator.allocate();
   if (nThreads > 0) {
 moveExecutor = targetDn.initMoveExecutor(nThreads);
   }
@@ -1166,6 +1181,25 @@ public class Dispatcher {
   LOG.debug("Disperse Interval sec = " +
   concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
 }
+
+// Determine the size of each mover thread pool per target
+int threadsPerTarget = maxMoverThreads/targets.size();
+if (threadsPerTarget == 0) {
+  // Some scheduled moves will get ignored as some targets won't have
+  // any threads allocated.
+  moverThreadAllocator.setLotSize(1);
+  LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+  maxMoverThreads + " is too small for moving blocks to " +
+  targets.size() + " targets. Balancing may be slower.");
+} else {
+  if  (threadsPerTarget > maxConcurrentMovesPerNode) {
+threadsPerTarget = maxConcurrentMovesPerNode;
+LOG.info("Limiting threads per target to the specified max.");
+  }
+  moverThreadAllocator.setLotSize(threadsPerTarget);
+  LOG.info("Allocating " + threadsPerTarget + " threads per target.");
+}
+
 long dSec = 0;
 final Iterator i = sources.iterator();
 for (int j = 0; j < futures.length; 

[07/34] hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

2017-07-27 Thread aengineer
HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal 
Lee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e

Branch: refs/heads/HDFS-7240
Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c
Parents: 3b48f81
Author: Kihwal Lee 
Authored: Fri Jul 21 09:14:19 2017 -0500
Committer: Kihwal Lee 
Committed: Fri Jul 21 09:14:19 2017 -0500

--
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++-
 1 file changed, 35 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index f855e45..9270fde 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -121,6 +122,7 @@ public class Dispatcher {
 
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
+  private final int maxMoverThreads;
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
@@ -139,11 +141,13 @@ public class Dispatcher {
   static class Allocator {
 private final int max;
 private int count = 0;
+private int lotSize = 1;
 
 Allocator(int max) {
   this.max = max;
 }
 
+/** Allocate specified number of items */
 synchronized int allocate(int n) {
   final int remaining = max - count;
   if (remaining <= 0) {
@@ -155,9 +159,19 @@ public class Dispatcher {
   }
 }
 
+/** Aloocate a single lot of items */
+int allocate() {
+  return allocate(lotSize);
+}
+
 synchronized void reset() {
   count = 0;
 }
+
+/** Set the lot size */
+synchronized void setLotSize(int lotSize) {
+  this.lotSize = lotSize;
+}
   }
 
   private static class GlobalBlockMap {
@@ -1017,6 +1031,7 @@ public class Dispatcher {
 this.dispatchExecutor = dispatcherThreads == 0? null
 : Executors.newFixedThreadPool(dispatcherThreads);
 this.moverThreadAllocator = new Allocator(moverThreads);
+this.maxMoverThreads = moverThreads;
 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
 this.getBlocksSize = getBlocksSize;
@@ -1116,7 +1131,7 @@ public class Dispatcher {
 final DDatanode targetDn = p.target.getDDatanode();
 ExecutorService moveExecutor = targetDn.getMoveExecutor();
 if (moveExecutor == null) {
-  final int nThreads = 
moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+  final int nThreads = moverThreadAllocator.allocate();
   if (nThreads > 0) {
 moveExecutor = targetDn.initMoveExecutor(nThreads);
   }
@@ -1166,6 +1181,25 @@ public class Dispatcher {
   LOG.debug("Disperse Interval sec = " +
   concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
 }
+
+// Determine the size of each mover thread pool per target
+int threadsPerTarget = maxMoverThreads/targets.size();
+if (threadsPerTarget == 0) {
+  // Some scheduled moves will get ignored as some targets won't have
+  // any threads allocated.
+  moverThreadAllocator.setLotSize(1);
+  LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+  maxMoverThreads + " is too small for moving blocks to " +
+  targets.size() + " targets. Balancing may be slower.");
+} else {
+  if  (threadsPerTarget > maxConcurrentMovesPerNode) {
+threadsPerTarget = maxConcurrentMovesPerNode;
+LOG.info("Limiting threads per target to the specified max.");
+  }
+  moverThreadAllocator.setLotSize(threadsPerTarget);
+  LOG.info("Allocating " + threadsPerTarget + " threads per target.");
+}
+
 long dSec = 0;
 final Iterator i = sources.iterator();
 for (int j = 0; j < futures.length; j++) 

hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee Updated CHANGES.txt (cherry picked from commit e229ffee64e8abd1df4d819b81151582ed3a16e2)

2017-07-21 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 1865cc5bd -> 62ce8f043


HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal 
Lee
Updated CHANGES.txt
(cherry picked from commit e229ffee64e8abd1df4d819b81151582ed3a16e2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/62ce8f04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/62ce8f04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/62ce8f04

Branch: refs/heads/branch-2.7
Commit: 62ce8f043065dfd6c98945cd2ce92b6825b16bef
Parents: 1865cc5
Author: Kihwal Lee 
Authored: Fri Jul 21 10:09:32 2017 -0500
Committer: Kihwal Lee 
Committed: Fri Jul 21 10:09:56 2017 -0500

--
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt |  2 ++
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++-
 2 files changed, 36 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ce8f04/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
--
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8b1bf44..5713513 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -394,6 +394,8 @@ Release 2.7.4 - UNRELEASED
 HDFS-12177. NameNode exits due to setting BlockPlacementPolicy loglevel to 
Debug.
 (Jiandan Yang via Brahma Reddy Battula)
 
+HDFS-11742. Improve balancer usability after HDFS-8818. (kihwal)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ce8f04/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 75c32f9..b3e591e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -118,6 +118,7 @@ public class Dispatcher {
 
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
+  private final int maxMoverThreads;
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
@@ -131,11 +132,13 @@ public class Dispatcher {
   static class Allocator {
 private final int max;
 private int count = 0;
+private int lotSize = 1;
 
 Allocator(int max) {
   this.max = max;
 }
 
+/** Allocate specified number of items */
 synchronized int allocate(int n) {
   final int remaining = max - count;
   if (remaining <= 0) {
@@ -147,9 +150,19 @@ public class Dispatcher {
   }
 }
 
+/** Aloocate a single lot of items */
+int allocate() {
+  return allocate(lotSize);
+}
+
 synchronized void reset() {
   count = 0;
 }
+
+/** Set the lot size */
+synchronized void setLotSize(int lotSize) {
+  this.lotSize = lotSize;
+}
   }
 
   private static class GlobalBlockMap {
@@ -905,6 +918,7 @@ public class Dispatcher {
 this.dispatchExecutor = dispatcherThreads == 0? null
 : Executors.newFixedThreadPool(dispatcherThreads);
 this.moverThreadAllocator = new Allocator(moverThreads);
+this.maxMoverThreads = moverThreads;
 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
 this.getBlocksSize = getBlocksSize;
@@ -999,7 +1013,7 @@ public class Dispatcher {
 final DDatanode targetDn = p.target.getDDatanode();
 ExecutorService moveExecutor = targetDn.getMoveExecutor();
 if (moveExecutor == null) {
-  final int nThreads = 
moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+  final int nThreads = moverThreadAllocator.allocate();
   if (nThreads > 0) {
 moveExecutor = targetDn.initMoveExecutor(nThreads);
   }
@@ -1050,6 +1064,25 @@ public class Dispatcher {
   LOG.debug("Disperse Interval sec = " +
   concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
 }
+
+// Determine the size of each mover thread pool per target
+int threadsPerTarget = maxMoverThreads/targets.size();
+if (threadsPerTarget == 0) {
+  // Some scheduled moves will get ignored as some targets won't have
+  // any threads allocated.
+  moverThreadAllocator.setLotSize(1);
+  LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+  maxMoverThreads + " is too small for 

hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

2017-07-21 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 f65dc6ee9 -> e229ffee6


HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal 
Lee

(cherry picked from commit 8e3a992eccff26a7344c3f0e719898fa97706b8c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e229ffee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e229ffee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e229ffee

Branch: refs/heads/branch-2.8
Commit: e229ffee64e8abd1df4d819b81151582ed3a16e2
Parents: f65dc6e
Author: Kihwal Lee 
Authored: Fri Jul 21 09:22:28 2017 -0500
Committer: Kihwal Lee 
Committed: Fri Jul 21 09:22:28 2017 -0500

--
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++-
 1 file changed, 34 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e229ffee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 36aef8e..33c6398 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -118,6 +118,7 @@ public class Dispatcher {
 
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
+  private final int maxMoverThreads;
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
@@ -136,11 +137,13 @@ public class Dispatcher {
   static class Allocator {
 private final int max;
 private int count = 0;
+private int lotSize = 1;
 
 Allocator(int max) {
   this.max = max;
 }
 
+/** Allocate specified number of items */
 synchronized int allocate(int n) {
   final int remaining = max - count;
   if (remaining <= 0) {
@@ -152,9 +155,19 @@ public class Dispatcher {
   }
 }
 
+/** Aloocate a single lot of items */
+int allocate() {
+  return allocate(lotSize);
+}
+
 synchronized void reset() {
   count = 0;
 }
+
+/** Set the lot size */
+synchronized void setLotSize(int lotSize) {
+  this.lotSize = lotSize;
+}
   }
 
   private static class GlobalBlockMap {
@@ -921,6 +934,7 @@ public class Dispatcher {
 this.dispatchExecutor = dispatcherThreads == 0? null
 : Executors.newFixedThreadPool(dispatcherThreads);
 this.moverThreadAllocator = new Allocator(moverThreads);
+this.maxMoverThreads = moverThreads;
 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
 this.getBlocksSize = getBlocksSize;
@@ -1024,7 +1038,7 @@ public class Dispatcher {
 final DDatanode targetDn = p.target.getDDatanode();
 ExecutorService moveExecutor = targetDn.getMoveExecutor();
 if (moveExecutor == null) {
-  final int nThreads = 
moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+  final int nThreads = moverThreadAllocator.allocate();
   if (nThreads > 0) {
 moveExecutor = targetDn.initMoveExecutor(nThreads);
   }
@@ -1075,6 +1089,25 @@ public class Dispatcher {
   LOG.debug("Disperse Interval sec = " +
   concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
 }
+
+// Determine the size of each mover thread pool per target
+int threadsPerTarget = maxMoverThreads/targets.size();
+if (threadsPerTarget == 0) {
+  // Some scheduled moves will get ignored as some targets won't have
+  // any threads allocated.
+  moverThreadAllocator.setLotSize(1);
+  LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+  maxMoverThreads + " is too small for moving blocks to " +
+  targets.size() + " targets. Balancing may be slower.");
+} else {
+  if  (threadsPerTarget > maxConcurrentMovesPerNode) {
+threadsPerTarget = maxConcurrentMovesPerNode;
+LOG.info("Limiting threads per target to the specified max.");
+  }
+  moverThreadAllocator.setLotSize(threadsPerTarget);
+  LOG.info("Allocating " + threadsPerTarget + " threads per target.");
+}
+
 long dSec = 0;
 final Iterator i = sources.iterator();
 for (int j = 0; j < futures.length; j++) {


-
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org



hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

2017-07-21 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 af2fb6a90 -> c12bf9a12


HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal 
Lee

(cherry picked from commit 8e3a992eccff26a7344c3f0e719898fa97706b8c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c12bf9a1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c12bf9a1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c12bf9a1

Branch: refs/heads/branch-2
Commit: c12bf9a12875e5f5c0c8e4a15f78e4812a0dd268
Parents: af2fb6a
Author: Kihwal Lee 
Authored: Fri Jul 21 09:19:01 2017 -0500
Committer: Kihwal Lee 
Committed: Fri Jul 21 09:19:01 2017 -0500

--
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 35 +++-
 1 file changed, 34 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c12bf9a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index fd09474..444984b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -118,6 +118,7 @@ public class Dispatcher {
 
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
+  private final int maxMoverThreads;
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
@@ -136,11 +137,13 @@ public class Dispatcher {
   static class Allocator {
 private final int max;
 private int count = 0;
+private int lotSize = 1;
 
 Allocator(int max) {
   this.max = max;
 }
 
+/** Allocate specified number of items */
 synchronized int allocate(int n) {
   final int remaining = max - count;
   if (remaining <= 0) {
@@ -152,9 +155,19 @@ public class Dispatcher {
   }
 }
 
+/** Aloocate a single lot of items */
+int allocate() {
+  return allocate(lotSize);
+}
+
 synchronized void reset() {
   count = 0;
 }
+
+/** Set the lot size */
+synchronized void setLotSize(int lotSize) {
+  this.lotSize = lotSize;
+}
   }
 
   private static class GlobalBlockMap {
@@ -921,6 +934,7 @@ public class Dispatcher {
 this.dispatchExecutor = dispatcherThreads == 0? null
 : Executors.newFixedThreadPool(dispatcherThreads);
 this.moverThreadAllocator = new Allocator(moverThreads);
+this.maxMoverThreads = moverThreads;
 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
 this.getBlocksSize = getBlocksSize;
@@ -1021,7 +1035,7 @@ public class Dispatcher {
 final DDatanode targetDn = p.target.getDDatanode();
 ExecutorService moveExecutor = targetDn.getMoveExecutor();
 if (moveExecutor == null) {
-  final int nThreads = 
moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+  final int nThreads = moverThreadAllocator.allocate();
   if (nThreads > 0) {
 moveExecutor = targetDn.initMoveExecutor(nThreads);
   }
@@ -1072,6 +1086,25 @@ public class Dispatcher {
   LOG.debug("Disperse Interval sec = " +
   concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
 }
+
+// Determine the size of each mover thread pool per target
+int threadsPerTarget = maxMoverThreads/targets.size();
+if (threadsPerTarget == 0) {
+  // Some scheduled moves will get ignored as some targets won't have
+  // any threads allocated.
+  moverThreadAllocator.setLotSize(1);
+  LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+  maxMoverThreads + " is too small for moving blocks to " +
+  targets.size() + " targets. Balancing may be slower.");
+} else {
+  if  (threadsPerTarget > maxConcurrentMovesPerNode) {
+threadsPerTarget = maxConcurrentMovesPerNode;
+LOG.info("Limiting threads per target to the specified max.");
+  }
+  moverThreadAllocator.setLotSize(threadsPerTarget);
+  LOG.info("Allocating " + threadsPerTarget + " threads per target.");
+}
+
 long dSec = 0;
 final Iterator i = sources.iterator();
 for (int j = 0; j < futures.length; j++) {


-
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org



hadoop git commit: HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

2017-07-21 Thread kihwal
Repository: hadoop
Updated Branches:
  refs/heads/trunk 3b48f8141 -> 8e3a992ec


HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal 
Lee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e

Branch: refs/heads/trunk
Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c
Parents: 3b48f81
Author: Kihwal Lee 
Authored: Fri Jul 21 09:14:19 2017 -0500
Committer: Kihwal Lee 
Committed: Fri Jul 21 09:14:19 2017 -0500

--
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++-
 1 file changed, 35 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
--
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index f855e45..9270fde 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -121,6 +122,7 @@ public class Dispatcher {
 
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
+  private final int maxMoverThreads;
 
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
@@ -139,11 +141,13 @@ public class Dispatcher {
   static class Allocator {
 private final int max;
 private int count = 0;
+private int lotSize = 1;
 
 Allocator(int max) {
   this.max = max;
 }
 
+/** Allocate specified number of items */
 synchronized int allocate(int n) {
   final int remaining = max - count;
   if (remaining <= 0) {
@@ -155,9 +159,19 @@ public class Dispatcher {
   }
 }
 
+/** Aloocate a single lot of items */
+int allocate() {
+  return allocate(lotSize);
+}
+
 synchronized void reset() {
   count = 0;
 }
+
+/** Set the lot size */
+synchronized void setLotSize(int lotSize) {
+  this.lotSize = lotSize;
+}
   }
 
   private static class GlobalBlockMap {
@@ -1017,6 +1031,7 @@ public class Dispatcher {
 this.dispatchExecutor = dispatcherThreads == 0? null
 : Executors.newFixedThreadPool(dispatcherThreads);
 this.moverThreadAllocator = new Allocator(moverThreads);
+this.maxMoverThreads = moverThreads;
 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
 this.getBlocksSize = getBlocksSize;
@@ -1116,7 +1131,7 @@ public class Dispatcher {
 final DDatanode targetDn = p.target.getDDatanode();
 ExecutorService moveExecutor = targetDn.getMoveExecutor();
 if (moveExecutor == null) {
-  final int nThreads = 
moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+  final int nThreads = moverThreadAllocator.allocate();
   if (nThreads > 0) {
 moveExecutor = targetDn.initMoveExecutor(nThreads);
   }
@@ -1166,6 +1181,25 @@ public class Dispatcher {
   LOG.debug("Disperse Interval sec = " +
   concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
 }
+
+// Determine the size of each mover thread pool per target
+int threadsPerTarget = maxMoverThreads/targets.size();
+if (threadsPerTarget == 0) {
+  // Some scheduled moves will get ignored as some targets won't have
+  // any threads allocated.
+  moverThreadAllocator.setLotSize(1);
+  LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+  maxMoverThreads + " is too small for moving blocks to " +
+  targets.size() + " targets. Balancing may be slower.");
+} else {
+  if  (threadsPerTarget > maxConcurrentMovesPerNode) {
+threadsPerTarget = maxConcurrentMovesPerNode;
+LOG.info("Limiting threads per target to the specified max.");
+  }
+  moverThreadAllocator.setLotSize(threadsPerTarget);
+  LOG.info("Allocating " + threadsPerTarget + " threads per target.");
+}
+
 long dSec = 0;
 final