Author: jing9
Date: Mon Aug 11 18:02:57 2014
New Revision: 1617338
URL: http://svn.apache.org/r1617338
Log:
HDFS-6837. Merge r1617337 from trunk.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
- copied unchanged from r1617337,
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Mon Aug 11 18:02:57 2014
@@ -129,6 +129,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
jing9)
+ HDFS-6837. Code cleanup for Balancer and Dispatcher. (szetszwo via
+ jing9)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Mon Aug 11 18:02:57 2014
@@ -44,7 +44,8 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
+import
org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
@@ -185,10 +186,10 @@ public class Balancer {
// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
- private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized
- = new LinkedList<BalancerDatanode.StorageGroup>();
- private final Collection<BalancerDatanode.StorageGroup> underUtilized
- = new LinkedList<BalancerDatanode.StorageGroup>();
+ private final Collection<StorageGroup> belowAvgUtilized
+ = new LinkedList<StorageGroup>();
+ private final Collection<StorageGroup> underUtilized
+ = new LinkedList<StorageGroup>();
/* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode.
@@ -210,8 +211,22 @@ public class Balancer {
* when connection fails.
*/
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+ final long movedWinWidth = conf.getLong(
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+ final int moverThreads = conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
+ final int dispatcherThreads = conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
+ final int maxConcurrentMovesPerNode = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
- p.nodesToBeExcluded, conf);
+ p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
+ maxConcurrentMovesPerNode, conf);
this.threshold = p.threshold;
this.policy = p.policy;
}
@@ -256,7 +271,7 @@ public class Balancer {
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
- final BalancerDatanode dn = dispatcher.newDatanode(r);
+ final DDatanode dn = dispatcher.newDatanode(r);
for(StorageType t : StorageType.asList()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
@@ -269,9 +284,9 @@ public class Balancer {
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, threshold);
- final BalancerDatanode.StorageGroup g;
+ final StorageGroup g;
if (utilizationDiff > 0) {
- final Source s = dn.addSource(t, utilization, maxSize2Move,
dispatcher);
+ final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
} else {
@@ -280,7 +295,7 @@ public class Balancer {
}
g = s;
} else {
- g = dn.addStorageGroup(t, utilization, maxSize2Move);
+ g = dn.addStorageGroup(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
} else {
@@ -329,7 +344,7 @@ public class Balancer {
logUtilizationCollection("underutilized", underUtilized);
}
- private static <T extends BalancerDatanode.StorageGroup>
+ private static <T extends StorageGroup>
void logUtilizationCollection(String name, Collection<T> items) {
LOG.info(items.size() + " " + name + ": " + items);
}
@@ -382,8 +397,7 @@ public class Balancer {
* datanodes or the candidates are source nodes with (utilization > Avg), and
* the others are target nodes with (utilization < Avg).
*/
- private <G extends BalancerDatanode.StorageGroup,
- C extends BalancerDatanode.StorageGroup>
+ private <G extends StorageGroup, C extends StorageGroup>
void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
Matcher matcher) {
for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
@@ -399,9 +413,8 @@ public class Balancer {
* For the given datanode, choose a candidate and then schedule it.
* @return true if a candidate is chosen; false if no candidates is chosen.
*/
- private <C extends BalancerDatanode.StorageGroup>
- boolean choose4One(BalancerDatanode.StorageGroup g,
- Collection<C> candidates, Matcher matcher) {
+ private <C extends StorageGroup> boolean choose4One(StorageGroup g,
+ Collection<C> candidates, Matcher matcher) {
final Iterator<C> i = candidates.iterator();
final C chosen = chooseCandidate(g, i, matcher);
@@ -419,8 +432,7 @@ public class Balancer {
return true;
}
- private void matchSourceWithTargetToMove(Source source,
- BalancerDatanode.StorageGroup target) {
+ private void matchSourceWithTargetToMove(Source source, StorageGroup target)
{
long size = Math.min(source.availableSizeToMove(),
target.availableSizeToMove());
final Task task = new Task(target, size);
source.addTask(task);
@@ -431,8 +443,7 @@ public class Balancer {
}
/** Choose a candidate for the given datanode. */
- private <G extends BalancerDatanode.StorageGroup,
- C extends BalancerDatanode.StorageGroup>
+ private <G extends StorageGroup, C extends StorageGroup>
C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
if (g.hasSpaceForScheduling()) {
for(; candidates.hasNext(); ) {
@@ -440,7 +451,7 @@ public class Balancer {
if (!c.hasSpaceForScheduling()) {
candidates.remove();
} else if (matcher.match(dispatcher.getCluster(),
- g.getDatanode(), c.getDatanode())) {
+ g.getDatanodeInfo(), c.getDatanodeInfo())) {
return c;
}
}
@@ -458,34 +469,15 @@ public class Balancer {
dispatcher.reset(conf);;
}
- // Exit status
- enum ReturnStatus {
- // These int values will map directly to the balancer process's exit code.
- SUCCESS(0),
- IN_PROGRESS(1),
- ALREADY_RUNNING(-1),
- NO_MOVE_BLOCK(-2),
- NO_MOVE_PROGRESS(-3),
- IO_EXCEPTION(-4),
- ILLEGAL_ARGS(-5),
- INTERRUPTED(-6);
-
- final int code;
-
- ReturnStatus(int code) {
- this.code = code;
- }
- }
-
/** Run an iteration for all datanodes. */
- private ReturnStatus run(int iteration, Formatter formatter,
+ private ExitStatus run(int iteration, Formatter formatter,
Configuration conf) {
try {
final List<DatanodeStorageReport> reports = dispatcher.init();
final long bytesLeftToMove = init(reports);
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
- return ReturnStatus.SUCCESS;
+ return ExitStatus.SUCCESS;
} else {
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ " to make the cluster balanced." );
@@ -499,7 +491,7 @@ public class Balancer {
final long bytesToMove = chooseStorageGroups();
if (bytesToMove == 0) {
System.out.println("No block can be moved. Exiting...");
- return ReturnStatus.NO_MOVE_BLOCK;
+ return ExitStatus.NO_MOVE_BLOCK;
} else {
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
" in this iteration");
@@ -520,19 +512,19 @@ public class Balancer {
* Exit no byte has been moved for 5 consecutive iterations.
*/
if (!dispatcher.dispatchAndCheckContinue()) {
- return ReturnStatus.NO_MOVE_PROGRESS;
+ return ExitStatus.NO_MOVE_PROGRESS;
}
- return ReturnStatus.IN_PROGRESS;
+ return ExitStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.ILLEGAL_ARGS;
+ return ExitStatus.ILLEGAL_ARGUMENTS;
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.IO_EXCEPTION;
+ return ExitStatus.IO_EXCEPTION;
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.INTERRUPTED;
+ return ExitStatus.INTERRUPTED;
} finally {
dispatcher.shutdownNow();
}
@@ -571,14 +563,14 @@ public class Balancer {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
- final ReturnStatus r = b.run(iteration, formatter, conf);
+ final ExitStatus r = b.run(iteration, formatter, conf);
// clean all lists
b.resetData(conf);
- if (r == ReturnStatus.IN_PROGRESS) {
+ if (r == ExitStatus.IN_PROGRESS) {
done = false;
- } else if (r != ReturnStatus.SUCCESS) {
+ } else if (r != ExitStatus.SUCCESS) {
//must be an error statue, return.
- return r.code;
+ return r.getExitCode();
}
}
@@ -591,7 +583,7 @@ public class Balancer {
nnc.close();
}
}
- return ReturnStatus.SUCCESS.code;
+ return ExitStatus.SUCCESS.getExitCode();
}
/* Given elaspedTime in ms, return a printable string */
@@ -662,10 +654,10 @@ public class Balancer {
return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.IO_EXCEPTION.code;
+ return ExitStatus.IO_EXCEPTION.getExitCode();
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.INTERRUPTED.code;
+ return ExitStatus.INTERRUPTED.getExitCode();
} finally {
System.out.format("%-24s ",
DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
Mon Aug 11 18:02:57 2014
@@ -48,7 +48,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.d
import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import
org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
@@ -91,7 +91,6 @@ public class Dispatcher {
// minutes
private final NameNodeConnector nnc;
- private final KeyManager keyManager;
private final SaslDataTransferClient saslClient;
/** Set of datanodes to be excluded. */
@@ -100,11 +99,10 @@ public class Dispatcher {
private final Set<String> includedNodes;
private final Collection<Source> sources = new HashSet<Source>();
- private final Collection<BalancerDatanode.StorageGroup> targets
- = new HashSet<BalancerDatanode.StorageGroup>();
+ private final Collection<StorageGroup> targets = new HashSet<StorageGroup>();
private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
- private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
+ private final MovedBlocks<StorageGroup> movedBlocks;
/** Map (datanodeUuid,storageType -> StorageGroup) */
private final StorageGroupMap storageGroupMap = new StorageGroupMap();
@@ -135,8 +133,7 @@ public class Dispatcher {
}
/** Remove all blocks except for the moved blocks. */
- private void removeAllButRetain(
- MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks) {
+ private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
if (!movedBlocks.contains(i.next())) {
i.remove();
@@ -150,17 +147,15 @@ public class Dispatcher {
return datanodeUuid + ":" + storageType;
}
- private final Map<String, BalancerDatanode.StorageGroup> map
- = new HashMap<String, BalancerDatanode.StorageGroup>();
+ private final Map<String, StorageGroup> map = new HashMap<String,
StorageGroup>();
- BalancerDatanode.StorageGroup get(String datanodeUuid,
- StorageType storageType) {
+ StorageGroup get(String datanodeUuid, StorageType storageType) {
return map.get(toKey(datanodeUuid, storageType));
}
- void put(BalancerDatanode.StorageGroup g) {
- final String key = toKey(g.getDatanode().getDatanodeUuid(),
g.storageType);
- final BalancerDatanode.StorageGroup existing = map.put(key, g);
+ void put(StorageGroup g) {
+ final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(),
g.storageType);
+ final StorageGroup existing = map.put(key, g);
Preconditions.checkState(existing == null);
}
@@ -177,8 +172,8 @@ public class Dispatcher {
private class PendingMove {
private DBlock block;
private Source source;
- private BalancerDatanode proxySource;
- private BalancerDatanode.StorageGroup target;
+ private DDatanode proxySource;
+ private StorageGroup target;
private PendingMove() {
}
@@ -235,24 +230,24 @@ public class Dispatcher {
* @return true if a proxy is found; otherwise false
*/
private boolean chooseProxySource() {
- final DatanodeInfo targetDN = target.getDatanode();
+ final DatanodeInfo targetDN = target.getDatanodeInfo();
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)
+ for (StorageGroup loc : block.getLocations()) {
+ if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
&& addTo(loc)) {
return true;
}
}
}
// check if there is replica which is on the same rack with the target
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+ for (StorageGroup loc : block.getLocations()) {
+ if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) &&
addTo(loc)) {
return true;
}
}
// find out a non-busy replica
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+ for (StorageGroup loc : block.getLocations()) {
if (addTo(loc)) {
return true;
}
@@ -261,10 +256,10 @@ public class Dispatcher {
}
/** add to a proxy source for specific block movement */
- private boolean addTo(BalancerDatanode.StorageGroup g) {
- final BalancerDatanode bdn = g.getBalancerDatanode();
- if (bdn.addPendingBlock(this)) {
- proxySource = bdn;
+ private boolean addTo(StorageGroup g) {
+ final DDatanode dn = g.getDDatanode();
+ if (dn.addPendingBlock(this)) {
+ proxySource = dn;
return true;
}
return false;
@@ -281,14 +276,13 @@ public class Dispatcher {
DataInputStream in = null;
try {
sock.connect(
- NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
+ NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
/*
* Unfortunately we don't have a good way to know if the Datanode is
* taking a really long time to move a block, OR something has gone
* wrong and it's never going to finish. To deal with this scenario, we
- * set a long timeout (20 minutes) to avoid hanging the balancer
- * indefinitely.
+ * set a long timeout (20 minutes) to avoid hanging indefinitely.
*/
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
@@ -298,9 +292,10 @@ public class Dispatcher {
InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
block.getBlock());
- Token<BlockTokenIdentifier> accessToken =
keyManager.getAccessToken(eb);
+ final KeyManager km = nnc.getKeyManager();
+ Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, keyManager, accessToken, target.getDatanode());
+ unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -314,21 +309,19 @@ public class Dispatcher {
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
- /*
- * proxy or target may have an issue, insert a small delay before using
- * these nodes further. This avoids a potential storm of
- * "threads quota exceeded" Warnings when the balancer gets out of sync
- * with work going on in datanode.
- */
+ // Proxy or target may have some issues, delay before using these nodes
+ // further in order to avoid a potential storm of "threads quota
+ // exceeded" warnings when the dispatcher gets out of sync with work
+ // going on in datanodes.
proxySource.activateDelay(DELAY_AFTER_ERROR);
- target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
+ target.getDDatanode().activateDelay(DELAY_AFTER_ERROR);
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
proxySource.removePendingBlock(this);
- target.getBalancerDatanode().removePendingBlock(this);
+ target.getDDatanode().removePendingBlock(this);
synchronized (this) {
reset();
@@ -342,8 +335,8 @@ public class Dispatcher {
/** Send a block replace request to the output stream */
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException {
- new Sender(out).replaceBlock(eb, target.storageType, accessToken, source
- .getDatanode().getDatanodeUuid(), proxySource.datanode);
+ new Sender(out).replaceBlock(eb, target.storageType, accessToken,
+ source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
}
/** Receive a block copy response from the input stream */
@@ -368,8 +361,7 @@ public class Dispatcher {
}
/** A class for keeping track of block locations in the dispatcher. */
- private static class DBlock extends
- MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
+ private static class DBlock extends MovedBlocks.Locations<StorageGroup> {
DBlock(Block block) {
super(block);
}
@@ -377,10 +369,10 @@ public class Dispatcher {
/** The class represents a desired move. */
static class Task {
- private final BalancerDatanode.StorageGroup target;
+ private final StorageGroup target;
private long size; // bytes scheduled to move
- Task(BalancerDatanode.StorageGroup target, long size) {
+ Task(StorageGroup target, long size) {
this.target = target;
this.size = size;
}
@@ -391,28 +383,25 @@ public class Dispatcher {
}
/** A class that keeps track of a datanode. */
- static class BalancerDatanode {
+ static class DDatanode {
/** A group of storages in a datanode with the same storage type. */
class StorageGroup {
final StorageType storageType;
- final double utilization;
final long maxSize2Move;
private long scheduledSize = 0L;
- private StorageGroup(StorageType storageType, double utilization,
- long maxSize2Move) {
+ private StorageGroup(StorageType storageType, long maxSize2Move) {
this.storageType = storageType;
- this.utilization = utilization;
this.maxSize2Move = maxSize2Move;
}
- BalancerDatanode getBalancerDatanode() {
- return BalancerDatanode.this;
+ private DDatanode getDDatanode() {
+ return DDatanode.this;
}
- DatanodeInfo getDatanode() {
- return BalancerDatanode.this.datanode;
+ DatanodeInfo getDatanodeInfo() {
+ return DDatanode.this.datanode;
}
/** Decide if still need to move more bytes */
@@ -447,7 +436,7 @@ public class Dispatcher {
@Override
public String toString() {
- return "" + utilization;
+ return getDisplayName();
}
}
@@ -461,10 +450,10 @@ public class Dispatcher {
@Override
public String toString() {
- return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
+ return getClass().getSimpleName() + ":" + datanode + ":" +
storageMap.values();
}
- private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
+ private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
this.datanode = r.getDatanodeInfo();
this.maxConcurrentMoves = maxConcurrentMoves;
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
@@ -475,18 +464,14 @@ public class Dispatcher {
Preconditions.checkState(existing == null);
}
- StorageGroup addStorageGroup(StorageType storageType, double utilization,
- long maxSize2Move) {
- final StorageGroup g = new StorageGroup(storageType, utilization,
- maxSize2Move);
+ StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) {
+ final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
put(storageType, g);
return g;
}
- Source addSource(StorageType storageType, double utilization,
- long maxSize2Move, Dispatcher balancer) {
- final Source s = balancer.new Source(storageType, utilization,
- maxSize2Move, this);
+ Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d)
{
+ final Source s = d.new Source(storageType, maxSize2Move, this);
put(storageType, s);
return s;
}
@@ -528,7 +513,7 @@ public class Dispatcher {
}
/** A node that can be the sources of a block move */
- class Source extends BalancerDatanode.StorageGroup {
+ class Source extends DDatanode.StorageGroup {
private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L;
@@ -539,9 +524,8 @@ public class Dispatcher {
*/
private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
- private Source(StorageType storageType, double utilization,
- long maxSize2Move, BalancerDatanode dn) {
- dn.super(storageType, utilization, maxSize2Move);
+ private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
+ dn.super(storageType, maxSize2Move);
}
/** Add a task */
@@ -565,7 +549,7 @@ public class Dispatcher {
*/
private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
- final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size);
+ final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(),
size);
long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks.getBlocks()) {
@@ -579,7 +563,7 @@ public class Dispatcher {
final String[] datanodeUuids = blk.getDatanodeUuids();
final StorageType[] storageTypes = blk.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) {
- final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+ final StorageGroup g = storageGroupMap.get(
datanodeUuids[i], storageTypes[i]);
if (g != null) { // not unknown
block.addLocation(g);
@@ -617,7 +601,7 @@ public class Dispatcher {
private PendingMove chooseNextMove() {
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
final Task task = i.next();
- final BalancerDatanode target = task.target.getBalancerDatanode();
+ final DDatanode target = task.target.getDDatanode();
PendingMove pendingBlock = new PendingMove();
if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
@@ -670,7 +654,7 @@ public class Dispatcher {
final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize();
boolean isTimeUp = false;
- int noPendingBlockIteration = 0;
+ int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
final PendingMove p = chooseNextMove();
@@ -699,11 +683,11 @@ public class Dispatcher {
return;
}
} else {
- // source node cannot find a pendingBlockToMove, iteration +1
- noPendingBlockIteration++;
+ // source node cannot find a pending block to move, iteration +1
+ noPendingMoveIteration++;
// in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations.
- if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
+ if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
resetScheduledSize();
}
}
@@ -726,29 +710,19 @@ public class Dispatcher {
}
}
- Dispatcher(NameNodeConnector theblockpool, Set<String> includedNodes,
- Set<String> excludedNodes, Configuration conf) {
- this.nnc = theblockpool;
- this.keyManager = nnc.getKeyManager();
+ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
+ Set<String> excludedNodes, long movedWinWidth, int moverThreads,
+ int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration
conf) {
+ this.nnc = nnc;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
-
- final long movedWinWidth = conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
- movedBlocks = new
MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
+ this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth);
this.cluster = NetworkTopology.getInstance(conf);
- this.moveExecutor = Executors.newFixedThreadPool(conf.getInt(
- DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
- this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt(
- DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
- this.maxConcurrentMovesPerNode = conf.getInt(
- DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
- DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+ this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
+ this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads);
+ this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
@@ -784,7 +758,7 @@ public class Dispatcher {
return b;
}
- void add(Source source, BalancerDatanode.StorageGroup target) {
+ void add(Source source, StorageGroup target) {
sources.add(source);
targets.add(target);
}
@@ -826,8 +800,8 @@ public class Dispatcher {
return trimmed;
}
- public BalancerDatanode newDatanode(DatanodeStorageReport r) {
- return new BalancerDatanode(r, maxConcurrentMovesPerNode);
+ public DDatanode newDatanode(DatanodeStorageReport r) {
+ return new DDatanode(r, maxConcurrentMovesPerNode);
}
public boolean dispatchAndCheckContinue() throws InterruptedException {
@@ -884,8 +858,8 @@ public class Dispatcher {
private void waitForMoveCompletion() {
for(;;) {
boolean empty = true;
- for (BalancerDatanode.StorageGroup t : targets) {
- if (!t.getBalancerDatanode().isPendingQEmpty()) {
+ for (StorageGroup t : targets) {
+ if (!t.getDDatanode().isPendingQEmpty()) {
empty = false;
break;
}
@@ -907,8 +881,8 @@ public class Dispatcher {
* 2. the block does not have a replica on the target;
* 3. doing the move does not reduce the number of racks that the block has
*/
- private boolean isGoodBlockCandidate(Source source,
- BalancerDatanode.StorageGroup target, DBlock block) {
+ private boolean isGoodBlockCandidate(Source source, StorageGroup target,
+ DBlock block) {
if (source.storageType != target.storageType) {
return false;
}
@@ -933,17 +907,17 @@ public class Dispatcher {
* Determine whether moving the given block replica from source to target
* would reduce the number of racks of the block replicas.
*/
- private boolean reduceNumOfRacks(Source source,
- BalancerDatanode.StorageGroup target, DBlock block) {
- final DatanodeInfo sourceDn = source.getDatanode();
- if (cluster.isOnSameRack(sourceDn, target.getDatanode())) {
+ private boolean reduceNumOfRacks(Source source, StorageGroup target,
+ DBlock block) {
+ final DatanodeInfo sourceDn = source.getDatanodeInfo();
+ if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
// source and target are on the same rack
return false;
}
boolean notOnSameRack = true;
synchronized (block) {
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
+ for (StorageGroup loc : block.getLocations()) {
+ if (cluster.isOnSameRack(loc.getDatanodeInfo(),
target.getDatanodeInfo())) {
notOnSameRack = false;
break;
}
@@ -953,8 +927,8 @@ public class Dispatcher {
// target is not on the same rack as any replica
return false;
}
- for (BalancerDatanode.StorageGroup g : block.getLocations()) {
- if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) {
+ for (StorageGroup g : block.getLocations()) {
+ if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
// source is on the same rack of another replica
return false;
}
@@ -971,10 +945,10 @@ public class Dispatcher {
* group with target
*/
private boolean isOnSameNodeGroupWithReplicas(
- BalancerDatanode.StorageGroup target, DBlock block, Source source) {
- final DatanodeInfo targetDn = target.getDatanode();
- for (BalancerDatanode.StorageGroup g : block.getLocations()) {
- if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn))
{
+ StorageGroup target, DBlock block, Source source) {
+ final DatanodeInfo targetDn = target.getDatanodeInfo();
+ for (StorageGroup g : block.getLocations()) {
+ if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(),
targetDn)) {
return true;
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Mon Aug 11 18:02:57 2014
@@ -570,10 +570,10 @@ public class TestBalancer {
final int r = Balancer.run(namenodes, p, conf);
if
(conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)
==0) {
- assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
return;
} else {
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
@@ -717,7 +717,7 @@ public class TestBalancer {
Balancer.Parameters.DEFAULT.threshold,
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
final int r = Balancer.run(namenodes, p, conf);
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally {
cluster.shutdown();
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
Mon Aug 11 18:02:57 2014
@@ -98,7 +98,7 @@ public class TestBalancerWithHANameNodes
assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, Balancer.Parameters.DEFAULT);
} finally {
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
Mon Aug 11 18:02:57 2014
@@ -160,7 +160,7 @@ public class TestBalancerWithMultipleNam
// start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
- Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
LOG.info("BALANCER 2");
wait(s.clients, totalUsed, totalCapacity);
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1617338&r1=1617337&r2=1617338&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Mon Aug 11 18:02:57 2014
@@ -176,7 +176,7 @@ public class TestBalancerWithNodeGroup {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
- assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor.");
@@ -190,8 +190,8 @@ public class TestBalancerWithNodeGroup {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
- Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
- (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
+ Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
+ (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor.");
}