Author: jing9 Date: Mon Aug 11 22:27:50 2014 New Revision: 1617377 URL: http://svn.apache.org/r1617377 Log: Merging r1616894 through r1617376 from trunk.
Added: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java - copied unchanged from r1617376, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed) hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1616894-1617376 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Mon Aug 11 22:27:50 2014 @@ -724,6 +724,10 @@ public class RpcProgramNfs3 extends RpcP FSDataInputStream fis = clientCache.getDfsInputStream(userName, Nfs3Utils.getFileIdPath(handle)); + if (fis == null) { + return new READ3Response(Nfs3Status.NFS3ERR_ACCES); + } + try { readCount = fis.read(offset, readbuffer, 0, count); } catch (IOException e) { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java Mon Aug 11 22:27:50 2014 @@ -278,13 +278,11 @@ public class TestRpcProgramNfs3 { readReq.serialize(xdr_req); // Attempt by an unpriviledged user should fail. - /* Hits HDFS-6582. It needs to be fixed first. READ3Response response1 = nfsd.read(xdr_req.asReadOnlyWrap(), securityHandlerUnpriviledged, new InetSocketAddress("localhost", 1234)); assertEquals("Incorrect return code:", Nfs3Status.NFS3ERR_ACCES, response1.getStatus()); - */ // Attempt by a priviledged user should pass. READ3Response response2 = nfsd.read(xdr_req.asReadOnlyWrap(), Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Aug 11 22:27:50 2014 @@ -407,6 +407,12 @@ Release 2.6.0 - UNRELEASED HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via jing9) + HDFS-6837. Code cleanup for Balancer and Dispatcher. (szetszwo via + jing9) + + HDFS-6838. Code cleanup for unnecessary INode replacement. + (Jing Zhao via wheat9) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) @@ -502,6 +508,9 @@ Release 2.6.0 - UNRELEASED HDFS-6791. A block could remain under replicated if all of its replicas are on decommissioned nodes. (Ming Ma via jing9) + HDFS-6582. Missing null check in RpcProgramNfs3#read(XDR, SecurityHandler) + (Abhiraj Butala via brandonli) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1616894-1617376 Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Aug 11 22:27:50 2014 @@ -44,7 +44,8 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; @@ -184,10 +185,10 @@ public class Balancer { // all data node lists private final Collection<Source> overUtilized = new LinkedList<Source>(); private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); - private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized - = new LinkedList<BalancerDatanode.StorageGroup>(); - private final Collection<BalancerDatanode.StorageGroup> underUtilized - = new LinkedList<BalancerDatanode.StorageGroup>(); + private final Collection<StorageGroup> belowAvgUtilized + = new LinkedList<StorageGroup>(); + private final Collection<StorageGroup> underUtilized + = new LinkedList<StorageGroup>(); /* Check that this Balancer is compatible with the Block Placement Policy * used by the Namenode. @@ -209,8 +210,22 @@ public class Balancer { * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + final int moverThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT); + final int dispatcherThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); + final int maxConcurrentMovesPerNode = conf.getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, - p.nodesToBeExcluded, conf); + p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, + maxConcurrentMovesPerNode, conf); this.threshold = p.threshold; this.policy = p.policy; } @@ -255,7 +270,7 @@ public class Balancer { // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { - final BalancerDatanode dn = dispatcher.newDatanode(r); + final DDatanode dn = dispatcher.newDatanode(r); for(StorageType t : StorageType.asList()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type @@ -268,9 +283,9 @@ public class Balancer { final long maxSize2Move = computeMaxSize2Move(capacity, getRemaining(r, t), utilizationDiff, threshold); - final BalancerDatanode.StorageGroup g; + final StorageGroup g; if (utilizationDiff > 0) { - final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher); + final Source s = dn.addSource(t, maxSize2Move, dispatcher); if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { @@ -279,7 +294,7 @@ public class Balancer { } g = s; } else { - g = dn.addStorageGroup(t, utilization, maxSize2Move); + g = dn.addStorageGroup(t, maxSize2Move); if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { @@ -328,7 +343,7 @@ public class Balancer { logUtilizationCollection("underutilized", underUtilized); } - private static <T extends BalancerDatanode.StorageGroup> + private static <T extends StorageGroup> void logUtilizationCollection(String name, Collection<T> items) { LOG.info(items.size() + " " + name + ": " + items); } @@ -381,8 +396,7 @@ public class Balancer { * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ - private <G extends BalancerDatanode.StorageGroup, - C extends BalancerDatanode.StorageGroup> + private <G extends StorageGroup, C extends StorageGroup> void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, Matcher matcher) { for(final Iterator<G> i = groups.iterator(); i.hasNext();) { @@ -398,9 +412,8 @@ public class Balancer { * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ - private <C extends BalancerDatanode.StorageGroup> - boolean choose4One(BalancerDatanode.StorageGroup g, - Collection<C> candidates, Matcher matcher) { + private <C extends StorageGroup> boolean choose4One(StorageGroup g, + Collection<C> candidates, Matcher matcher) { final Iterator<C> i = candidates.iterator(); final C chosen = chooseCandidate(g, i, matcher); @@ -418,8 +431,7 @@ public class Balancer { return true; } - private void matchSourceWithTargetToMove(Source source, - BalancerDatanode.StorageGroup target) { + private void matchSourceWithTargetToMove(Source source, StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); final Task task = new Task(target, size); source.addTask(task); @@ -430,8 +442,7 @@ public class Balancer { } /** Choose a candidate for the given datanode. */ - private <G extends BalancerDatanode.StorageGroup, - C extends BalancerDatanode.StorageGroup> + private <G extends StorageGroup, C extends StorageGroup> C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { @@ -439,7 +450,7 @@ public class Balancer { if (!c.hasSpaceForScheduling()) { candidates.remove(); } else if (matcher.match(dispatcher.getCluster(), - g.getDatanode(), c.getDatanode())) { + g.getDatanodeInfo(), c.getDatanodeInfo())) { return c; } } @@ -457,34 +468,15 @@ public class Balancer { dispatcher.reset(conf);; } - // Exit status - enum ReturnStatus { - // These int values will map directly to the balancer process's exit code. - SUCCESS(0), - IN_PROGRESS(1), - ALREADY_RUNNING(-1), - NO_MOVE_BLOCK(-2), - NO_MOVE_PROGRESS(-3), - IO_EXCEPTION(-4), - ILLEGAL_ARGS(-5), - INTERRUPTED(-6); - - final int code; - - ReturnStatus(int code) { - this.code = code; - } - } - /** Run an iteration for all datanodes. */ - private ReturnStatus run(int iteration, Formatter formatter, + private ExitStatus run(int iteration, Formatter formatter, Configuration conf) { try { final List<DatanodeStorageReport> reports = dispatcher.init(); final long bytesLeftToMove = init(reports); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); - return ReturnStatus.SUCCESS; + return ExitStatus.SUCCESS; } else { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); @@ -498,7 +490,7 @@ public class Balancer { final long bytesToMove = chooseStorageGroups(); if (bytesToMove == 0) { System.out.println("No block can be moved. Exiting..."); - return ReturnStatus.NO_MOVE_BLOCK; + return ExitStatus.NO_MOVE_BLOCK; } else { LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + " in this iteration"); @@ -519,19 +511,19 @@ public class Balancer { * Exit no byte has been moved for 5 consecutive iterations. */ if (!dispatcher.dispatchAndCheckContinue()) { - return ReturnStatus.NO_MOVE_PROGRESS; + return ExitStatus.NO_MOVE_PROGRESS; } - return ReturnStatus.IN_PROGRESS; + return ExitStatus.IN_PROGRESS; } catch (IllegalArgumentException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.ILLEGAL_ARGS; + return ExitStatus.ILLEGAL_ARGUMENTS; } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION; + return ExitStatus.IO_EXCEPTION; } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED; + return ExitStatus.INTERRUPTED; } finally { dispatcher.shutdownNow(); } @@ -570,14 +562,14 @@ public class Balancer { Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { final Balancer b = new Balancer(nnc, p, conf); - final ReturnStatus r = b.run(iteration, formatter, conf); + final ExitStatus r = b.run(iteration, formatter, conf); // clean all lists b.resetData(conf); - if (r == ReturnStatus.IN_PROGRESS) { + if (r == ExitStatus.IN_PROGRESS) { done = false; - } else if (r != ReturnStatus.SUCCESS) { + } else if (r != ExitStatus.SUCCESS) { //must be an error statue, return. - return r.code; + return r.getExitCode(); } } @@ -590,7 +582,7 @@ public class Balancer { nnc.close(); } } - return ReturnStatus.SUCCESS.code; + return ExitStatus.SUCCESS.getExitCode(); } /* Given elaspedTime in ms, return a printable string */ @@ -661,10 +653,10 @@ public class Balancer { return Balancer.run(namenodes, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION.code; + return ExitStatus.IO_EXCEPTION.getExitCode(); } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED.code; + return ExitStatus.INTERRUPTED.getExitCode(); } finally { System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.println("Balancing took " + time2Str(Time.now()-startTime)); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Mon Aug 11 22:27:50 2014 @@ -48,7 +48,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; @@ -91,7 +91,6 @@ public class Dispatcher { // minutes private final NameNodeConnector nnc; - private final KeyManager keyManager; private final SaslDataTransferClient saslClient; /** Set of datanodes to be excluded. */ @@ -100,11 +99,10 @@ public class Dispatcher { private final Set<String> includedNodes; private final Collection<Source> sources = new HashSet<Source>(); - private final Collection<BalancerDatanode.StorageGroup> targets - = new HashSet<BalancerDatanode.StorageGroup>(); + private final Collection<StorageGroup> targets = new HashSet<StorageGroup>(); private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); - private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks; + private final MovedBlocks<StorageGroup> movedBlocks; /** Map (datanodeUuid,storageType -> StorageGroup) */ private final StorageGroupMap storageGroupMap = new StorageGroupMap(); @@ -135,8 +133,7 @@ public class Dispatcher { } /** Remove all blocks except for the moved blocks. */ - private void removeAllButRetain( - MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks) { + private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) { for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) { if (!movedBlocks.contains(i.next())) { i.remove(); @@ -150,17 +147,15 @@ public class Dispatcher { return datanodeUuid + ":" + storageType; } - private final Map<String, BalancerDatanode.StorageGroup> map - = new HashMap<String, BalancerDatanode.StorageGroup>(); + private final Map<String, StorageGroup> map = new HashMap<String, StorageGroup>(); - BalancerDatanode.StorageGroup get(String datanodeUuid, - StorageType storageType) { + StorageGroup get(String datanodeUuid, StorageType storageType) { return map.get(toKey(datanodeUuid, storageType)); } - void put(BalancerDatanode.StorageGroup g) { - final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); - final BalancerDatanode.StorageGroup existing = map.put(key, g); + void put(StorageGroup g) { + final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType); + final StorageGroup existing = map.put(key, g); Preconditions.checkState(existing == null); } @@ -177,8 +172,8 @@ public class Dispatcher { private class PendingMove { private DBlock block; private Source source; - private BalancerDatanode proxySource; - private BalancerDatanode.StorageGroup target; + private DDatanode proxySource; + private StorageGroup target; private PendingMove() { } @@ -235,24 +230,24 @@ public class Dispatcher { * @return true if a proxy is found; otherwise false */ private boolean chooseProxySource() { - final DatanodeInfo targetDN = target.getDatanode(); + final DatanodeInfo targetDN = target.getDatanodeInfo(); // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) + for (StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } } // check if there is replica which is on the same rack with the target - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { + for (StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : block.getLocations()) { if (addTo(loc)) { return true; } @@ -261,10 +256,10 @@ public class Dispatcher { } /** add to a proxy source for specific block movement */ - private boolean addTo(BalancerDatanode.StorageGroup g) { - final BalancerDatanode bdn = g.getBalancerDatanode(); - if (bdn.addPendingBlock(this)) { - proxySource = bdn; + private boolean addTo(StorageGroup g) { + final DDatanode dn = g.getDDatanode(); + if (dn.addPendingBlock(this)) { + proxySource = dn; return true; } return false; @@ -281,14 +276,13 @@ public class Dispatcher { DataInputStream in = null; try { sock.connect( - NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), + NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); /* * Unfortunately we don't have a good way to know if the Datanode is * taking a really long time to move a block, OR something has gone * wrong and it's never going to finish. To deal with this scenario, we - * set a long timeout (20 minutes) to avoid hanging the balancer - * indefinitely. + * set a long timeout (20 minutes) to avoid hanging indefinitely. */ sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); @@ -298,9 +292,10 @@ public class Dispatcher { InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); - Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb); + final KeyManager km = nnc.getKeyManager(); + Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyManager, accessToken, target.getDatanode()); + unbufIn, km, accessToken, target.getDatanodeInfo()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, @@ -314,21 +309,19 @@ public class Dispatcher { LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); - /* - * proxy or target may have an issue, insert a small delay before using - * these nodes further. This avoids a potential storm of - * "threads quota exceeded" Warnings when the balancer gets out of sync - * with work going on in datanode. - */ + // Proxy or target may have some issues, delay before using these nodes + // further in order to avoid a potential storm of "threads quota + // exceeded" warnings when the dispatcher gets out of sync with work + // going on in datanodes. proxySource.activateDelay(DELAY_AFTER_ERROR); - target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); + target.getDDatanode().activateDelay(DELAY_AFTER_ERROR); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); - target.getBalancerDatanode().removePendingBlock(this); + target.getDDatanode().removePendingBlock(this); synchronized (this) { reset(); @@ -342,8 +335,8 @@ public class Dispatcher { /** Send a block replace request to the output stream */ private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token<BlockTokenIdentifier> accessToken) throws IOException { - new Sender(out).replaceBlock(eb, target.storageType, accessToken, source - .getDatanode().getDatanodeUuid(), proxySource.datanode); + new Sender(out).replaceBlock(eb, target.storageType, accessToken, + source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } /** Receive a block copy response from the input stream */ @@ -368,8 +361,7 @@ public class Dispatcher { } /** A class for keeping track of block locations in the dispatcher. */ - private static class DBlock extends - MovedBlocks.Locations<BalancerDatanode.StorageGroup> { + private static class DBlock extends MovedBlocks.Locations<StorageGroup> { DBlock(Block block) { super(block); } @@ -377,10 +369,10 @@ public class Dispatcher { /** The class represents a desired move. */ static class Task { - private final BalancerDatanode.StorageGroup target; + private final StorageGroup target; private long size; // bytes scheduled to move - Task(BalancerDatanode.StorageGroup target, long size) { + Task(StorageGroup target, long size) { this.target = target; this.size = size; } @@ -391,28 +383,25 @@ public class Dispatcher { } /** A class that keeps track of a datanode. */ - static class BalancerDatanode { + static class DDatanode { /** A group of storages in a datanode with the same storage type. */ class StorageGroup { final StorageType storageType; - final double utilization; final long maxSize2Move; private long scheduledSize = 0L; - private StorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { + private StorageGroup(StorageType storageType, long maxSize2Move) { this.storageType = storageType; - this.utilization = utilization; this.maxSize2Move = maxSize2Move; } - BalancerDatanode getBalancerDatanode() { - return BalancerDatanode.this; + private DDatanode getDDatanode() { + return DDatanode.this; } - DatanodeInfo getDatanode() { - return BalancerDatanode.this.datanode; + DatanodeInfo getDatanodeInfo() { + return DDatanode.this.datanode; } /** Decide if still need to move more bytes */ @@ -447,7 +436,7 @@ public class Dispatcher { @Override public String toString() { - return "" + utilization; + return getDisplayName(); } } @@ -461,10 +450,10 @@ public class Dispatcher { @Override public String toString() { - return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; + return getClass().getSimpleName() + ":" + datanode + ":" + storageMap.values(); } - private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { + private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { this.datanode = r.getDatanodeInfo(); this.maxConcurrentMoves = maxConcurrentMoves; this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves); @@ -475,18 +464,14 @@ public class Dispatcher { Preconditions.checkState(existing == null); } - StorageGroup addStorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { - final StorageGroup g = new StorageGroup(storageType, utilization, - maxSize2Move); + StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) { + final StorageGroup g = new StorageGroup(storageType, maxSize2Move); put(storageType, g); return g; } - Source addSource(StorageType storageType, double utilization, - long maxSize2Move, Dispatcher balancer) { - final Source s = balancer.new Source(storageType, utilization, - maxSize2Move, this); + Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { + final Source s = d.new Source(storageType, maxSize2Move, this); put(storageType, s); return s; } @@ -528,7 +513,7 @@ public class Dispatcher { } /** A node that can be the sources of a block move */ - class Source extends BalancerDatanode.StorageGroup { + class Source extends DDatanode.StorageGroup { private final List<Task> tasks = new ArrayList<Task>(2); private long blocksToReceive = 0L; @@ -539,9 +524,8 @@ public class Dispatcher { */ private final List<DBlock> srcBlocks = new ArrayList<DBlock>(); - private Source(StorageType storageType, double utilization, - long maxSize2Move, BalancerDatanode dn) { - dn.super(storageType, utilization, maxSize2Move); + private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { + dn.super(storageType, maxSize2Move); } /** Add a task */ @@ -565,7 +549,7 @@ public class Dispatcher { */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size); + final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); long bytesReceived = 0; for (BlockWithLocations blk : newBlocks.getBlocks()) { @@ -579,7 +563,7 @@ public class Dispatcher { final String[] datanodeUuids = blk.getDatanodeUuids(); final StorageType[] storageTypes = blk.getStorageTypes(); for (int i = 0; i < datanodeUuids.length; i++) { - final BalancerDatanode.StorageGroup g = storageGroupMap.get( + final StorageGroup g = storageGroupMap.get( datanodeUuids[i], storageTypes[i]); if (g != null) { // not unknown block.addLocation(g); @@ -617,7 +601,7 @@ public class Dispatcher { private PendingMove chooseNextMove() { for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { final Task task = i.next(); - final BalancerDatanode target = task.target.getBalancerDatanode(); + final DDatanode target = task.target.getDDatanode(); PendingMove pendingBlock = new PendingMove(); if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation @@ -670,7 +654,7 @@ public class Dispatcher { final long startTime = Time.monotonicNow(); this.blocksToReceive = 2 * getScheduledSize(); boolean isTimeUp = false; - int noPendingBlockIteration = 0; + int noPendingMoveIteration = 0; while (!isTimeUp && getScheduledSize() > 0 && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { final PendingMove p = chooseNextMove(); @@ -699,11 +683,11 @@ public class Dispatcher { return; } } else { - // source node cannot find a pendingBlockToMove, iteration +1 - noPendingBlockIteration++; + // source node cannot find a pending block to move, iteration +1 + noPendingMoveIteration++; // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. - if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { + if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { resetScheduledSize(); } } @@ -726,29 +710,19 @@ public class Dispatcher { } } - Dispatcher(NameNodeConnector theblockpool, Set<String> includedNodes, - Set<String> excludedNodes, Configuration conf) { - this.nnc = theblockpool; - this.keyManager = nnc.getKeyManager(); + public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, + Set<String> excludedNodes, long movedWinWidth, int moverThreads, + int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { + this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; - - final long movedWinWidth = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth); + this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); - this.moveExecutor = Executors.newFixedThreadPool(conf.getInt( - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); - this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt( - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); - this.maxConcurrentMovesPerNode = conf.getInt( - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.moveExecutor = Executors.newFixedThreadPool(moverThreads); + this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads); + this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; final boolean fallbackToSimpleAuthAllowed = conf.getBoolean( CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, @@ -784,7 +758,7 @@ public class Dispatcher { return b; } - void add(Source source, BalancerDatanode.StorageGroup target) { + void add(Source source, StorageGroup target) { sources.add(source); targets.add(target); } @@ -826,8 +800,8 @@ public class Dispatcher { return trimmed; } - public BalancerDatanode newDatanode(DatanodeStorageReport r) { - return new BalancerDatanode(r, maxConcurrentMovesPerNode); + public DDatanode newDatanode(DatanodeStorageReport r) { + return new DDatanode(r, maxConcurrentMovesPerNode); } public boolean dispatchAndCheckContinue() throws InterruptedException { @@ -884,8 +858,8 @@ public class Dispatcher { private void waitForMoveCompletion() { for(;;) { boolean empty = true; - for (BalancerDatanode.StorageGroup t : targets) { - if (!t.getBalancerDatanode().isPendingQEmpty()) { + for (StorageGroup t : targets) { + if (!t.getDDatanode().isPendingQEmpty()) { empty = false; break; } @@ -907,8 +881,8 @@ public class Dispatcher { * 2. the block does not have a replica on the target; * 3. doing the move does not reduce the number of racks that the block has */ - private boolean isGoodBlockCandidate(Source source, - BalancerDatanode.StorageGroup target, DBlock block) { + private boolean isGoodBlockCandidate(Source source, StorageGroup target, + DBlock block) { if (source.storageType != target.storageType) { return false; } @@ -933,17 +907,17 @@ public class Dispatcher { * Determine whether moving the given block replica from source to target * would reduce the number of racks of the block replicas. */ - private boolean reduceNumOfRacks(Source source, - BalancerDatanode.StorageGroup target, DBlock block) { - final DatanodeInfo sourceDn = source.getDatanode(); - if (cluster.isOnSameRack(sourceDn, target.getDatanode())) { + private boolean reduceNumOfRacks(Source source, StorageGroup target, + DBlock block) { + final DatanodeInfo sourceDn = source.getDatanodeInfo(); + if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) { // source and target are on the same rack return false; } boolean notOnSameRack = true; synchronized (block) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { + for (StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) { notOnSameRack = false; break; } @@ -953,8 +927,8 @@ public class Dispatcher { // target is not on the same rack as any replica return false; } - for (BalancerDatanode.StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) { + for (StorageGroup g : block.getLocations()) { + if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) { // source is on the same rack of another replica return false; } @@ -971,10 +945,10 @@ public class Dispatcher { * group with target */ private boolean isOnSameNodeGroupWithReplicas( - BalancerDatanode.StorageGroup target, DBlock block, Source source) { - final DatanodeInfo targetDn = target.getDatanode(); - for (BalancerDatanode.StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) { + StorageGroup target, DBlock block, Source source) { + final DatanodeInfo targetDn = target.getDatanodeInfo(); + for (StorageGroup g : block.getLocations()) { + if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) { return true; } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Aug 11 22:27:50 2014 @@ -768,8 +768,6 @@ public class FSDirectory implements Clos checkSnapshot(srcInode, null); } - - private class RenameOperation { private final INodesInPath srcIIP; private final INodesInPath dstIIP; @@ -802,7 +800,7 @@ public class FSDirectory implements Clos // snapshot is taken on the dst tree, changes will be recorded in the latest // snapshot of the src tree. if (isSrcInSnapshot) { - srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId()); + srcChild.recordModification(srcIIP.getLatestSnapshotId()); } // check srcChild for reference @@ -932,8 +930,7 @@ public class FSDirectory implements Clos updateCount(iip, 0, dsDelta, true); } - file = file.setFileReplication(replication, iip.getLatestSnapshotId(), - inodeMap); + file.setFileReplication(replication, iip.getLatestSnapshotId()); final short newBR = file.getBlockReplication(); // check newBR < oldBR case. @@ -1216,8 +1213,7 @@ public class FSDirectory implements Clos // record modification final int latestSnapshot = iip.getLatestSnapshotId(); - targetNode = targetNode.recordModification(latestSnapshot); - iip.setLastINode(targetNode); + targetNode.recordModification(latestSnapshot); // Remove the node from the namespace long removed = removeLastINode(iip); @@ -2126,7 +2122,7 @@ public class FSDirectory implements Clos } final int latest = iip.getLatestSnapshotId(); - dirNode = dirNode.recordModification(latest); + dirNode.recordModification(latest); dirNode.setQuota(nsQuota, dsQuota); return dirNode; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Aug 11 22:27:50 2014 @@ -2515,7 +2515,7 @@ public class FSNamesystem implements Nam boolean writeToEditLog, int latestSnapshot, boolean logRetryCache) throws IOException { - file = file.recordModification(latestSnapshot); + file.recordModification(latestSnapshot); final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine); leaseManager.addLease(cons.getFileUnderConstructionFeature() @@ -4214,7 +4214,7 @@ public class FSNamesystem implements Nam Preconditions.checkArgument(uc != null); leaseManager.removeLease(uc.getClientName(), src); - pendingFile = pendingFile.recordModification(latestSnapshot); + pendingFile.recordModification(latestSnapshot); // The file is no longer pending. // Create permanent INode, update blocks. No need to replace the inode here Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Mon Aug 11 22:27:50 2014 @@ -97,9 +97,9 @@ public abstract class INode implements I /** Set user */ final INode setUser(String user, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setUser(user); - return nodeToUpdate; + recordModification(latestSnapshotId); + setUser(user); + return this; } /** * @param snapshotId @@ -122,9 +122,9 @@ public abstract class INode implements I /** Set group */ final INode setGroup(String group, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setGroup(group); - return nodeToUpdate; + recordModification(latestSnapshotId); + setGroup(group); + return this; } /** @@ -148,9 +148,9 @@ public abstract class INode implements I /** Set the {@link FsPermission} of this {@link INode} */ INode setPermission(FsPermission permission, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setPermission(permission); - return nodeToUpdate; + recordModification(latestSnapshotId); + setPermission(permission); + return this; } abstract AclFeature getAclFeature(int snapshotId); @@ -164,18 +164,18 @@ public abstract class INode implements I final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.addAclFeature(aclFeature); - return nodeToUpdate; + recordModification(latestSnapshotId); + addAclFeature(aclFeature); + return this; } abstract void removeAclFeature(); final INode removeAclFeature(int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.removeAclFeature(); - return nodeToUpdate; + recordModification(latestSnapshotId); + removeAclFeature(); + return this; } /** @@ -199,9 +199,9 @@ public abstract class INode implements I final INode addXAttrFeature(XAttrFeature xAttrFeature, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.addXAttrFeature(xAttrFeature); - return nodeToUpdate; + recordModification(latestSnapshotId); + addXAttrFeature(xAttrFeature); + return this; } /** @@ -211,9 +211,9 @@ public abstract class INode implements I final INode removeXAttrFeature(int lastestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(lastestSnapshotId); - nodeToUpdate.removeXAttrFeature(); - return nodeToUpdate; + recordModification(lastestSnapshotId); + removeXAttrFeature(); + return this; } /** @@ -298,11 +298,8 @@ public abstract class INode implements I * @param latestSnapshotId The id of the latest snapshot that has been taken. * Note that it is {@link Snapshot#CURRENT_STATE_ID} * if no snapshots have been taken. - * @return The current inode, which usually is the same object of this inode. - * However, in some cases, this inode may be replaced with a new inode - * for maintaining snapshots. The current inode is then the new inode. */ - abstract INode recordModification(final int latestSnapshotId) + abstract void recordModification(final int latestSnapshotId) throws QuotaExceededException; /** Check whether it's a reference. */ @@ -652,9 +649,9 @@ public abstract class INode implements I /** Set the last modification time of inode. */ public final INode setModificationTime(long modificationTime, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setModificationTime(modificationTime); - return nodeToUpdate; + recordModification(latestSnapshotId); + setModificationTime(modificationTime); + return this; } /** @@ -682,9 +679,9 @@ public abstract class INode implements I */ public final INode setAccessTime(long accessTime, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setAccessTime(accessTime); - return nodeToUpdate; + recordModification(latestSnapshotId); + setAccessTime(accessTime); + return this; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Aug 11 22:27:50 2014 @@ -318,7 +318,7 @@ public class INodeDirectory extends INod } @Override - public INodeDirectory recordModification(int latestSnapshotId) + public void recordModification(int latestSnapshotId) throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId) && !shouldRecordInSrcSnapshot(latestSnapshotId)) { @@ -330,7 +330,6 @@ public class INodeDirectory extends INod // record self in the diff list if necessary sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null); } - return this; } /** Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Mon Aug 11 22:27:50 2014 @@ -296,7 +296,7 @@ public class INodeFile extends INodeWith } @Override - public INodeFile recordModification(final int latestSnapshotId) + public void recordModification(final int latestSnapshotId) throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId) && !shouldRecordInSrcSnapshot(latestSnapshotId)) { @@ -308,7 +308,6 @@ public class INodeFile extends INodeWith // record self in the diff list if necessary sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null); } - return this; } public FileDiffList getDiffs() { @@ -356,11 +355,10 @@ public class INodeFile extends INodeWith /** Set the replication factor of this file. */ public final INodeFile setFileReplication(short replication, - int latestSnapshotId, final INodeMap inodeMap) - throws QuotaExceededException { - final INodeFile nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setFileReplication(replication); - return nodeToUpdate; + int latestSnapshotId) throws QuotaExceededException { + recordModification(latestSnapshotId); + setFileReplication(replication); + return this; } /** @return preferred block size (in bytes) of the file. */ Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Mon Aug 11 22:27:50 2014 @@ -93,9 +93,8 @@ public class INodeMap { "", "", new FsPermission((short) 0)), 0, 0) { @Override - INode recordModification(int latestSnapshotId) + void recordModification(int latestSnapshotId) throws QuotaExceededException { - return null; } @Override Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Mon Aug 11 22:27:50 2014 @@ -287,11 +287,9 @@ public abstract class INodeReference ext } @Override - final INode recordModification(int latestSnapshotId) + final void recordModification(int latestSnapshotId) throws QuotaExceededException { referred.recordModification(latestSnapshotId); - // reference is never replaced - return this; } @Override // used by WithCount Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Mon Aug 11 22:27:50 2014 @@ -47,12 +47,11 @@ public class INodeSymlink extends INodeW } @Override - INode recordModification(int latestSnapshotId) throws QuotaExceededException { + void recordModification(int latestSnapshotId) throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId)) { INodeDirectory parent = getParent(); parent.saveChild2Snapshot(this, latestSnapshotId, new INodeSymlink(this)); } - return this; } /** @return true unconditionally. */ Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Mon Aug 11 22:27:50 2014 @@ -570,10 +570,10 @@ public class TestBalancer { final int r = Balancer.run(namenodes, p, conf); if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { - assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); return; } else { - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); @@ -717,7 +717,7 @@ public class TestBalancer { Balancer.Parameters.DEFAULT.threshold, datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); final int r = Balancer.run(namenodes, p, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { cluster.shutdown(); } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Mon Aug 11 22:27:50 2014 @@ -98,7 +98,7 @@ public class TestBalancerWithHANameNodes assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, Balancer.Parameters.DEFAULT); } finally { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Mon Aug 11 22:27:50 2014 @@ -160,7 +160,7 @@ public class TestBalancerWithMultipleNam // start rebalancing final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); - Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r); LOG.info("BALANCER 2"); wait(s.clients, totalUsed, totalCapacity); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1617377&r1=1617376&r2=1617377&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Mon Aug 11 22:27:50 2014 @@ -176,7 +176,7 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); @@ -190,8 +190,8 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); - Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code || - (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code)); + Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || + (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); }