HDFS-9390. Block management for maintenance states.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b61fb267 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b61fb267 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b61fb267 Branch: refs/heads/HADOOP-13070 Commit: b61fb267b92b2736920b4bd0c673d31e7632ebb9 Parents: f5d9235 Author: Ming Ma <min...@apache.org> Authored: Mon Oct 17 17:45:41 2016 -0700 Committer: Ming Ma <min...@apache.org> Committed: Mon Oct 17 17:45:41 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 53 +- .../hadoop/hdfs/server/balancer/Dispatcher.java | 11 +- .../server/blockmanagement/BlockManager.java | 249 ++++-- .../BlockPlacementPolicyDefault.java | 4 +- .../CacheReplicationMonitor.java | 2 +- .../blockmanagement/DatanodeDescriptor.java | 35 +- .../server/blockmanagement/DatanodeManager.java | 47 +- .../blockmanagement/DecommissionManager.java | 142 +++- .../blockmanagement/ErasureCodingWork.java | 16 +- .../blockmanagement/HeartbeatManager.java | 23 +- .../blockmanagement/LowRedundancyBlocks.java | 47 +- .../server/blockmanagement/NumberReplicas.java | 30 +- .../blockmanagement/StorageTypeStats.java | 8 +- .../hdfs/server/namenode/FSNamesystem.java | 9 +- .../src/main/resources/hdfs-default.xml | 7 + .../apache/hadoop/hdfs/AdminStatesBaseTest.java | 20 +- .../apache/hadoop/hdfs/TestDecommission.java | 2 +- .../hadoop/hdfs/TestMaintenanceState.java | 775 +++++++++++++++++-- .../blockmanagement/TestBlockManager.java | 8 +- .../namenode/TestDecommissioningStatus.java | 57 +- .../namenode/TestNamenodeCapacityReport.java | 78 +- .../hadoop/hdfs/util/HostsFileWriter.java | 1 + 23 files changed, 1240 insertions(+), 389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 10c0ad6..d54c109 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -220,6 +220,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.reconstruction.pending.timeout-sec"; public static final int DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = -1; + public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY = + "dfs.namenode.maintenance.replication.min"; + public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT + = 1; + public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY; public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 83870cf..23166e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -124,48 +124,57 @@ public class DFSUtil { } /** - * Compartor for sorting DataNodeInfo[] based on decommissioned states. - * Decommissioned nodes are moved to the end of the array on sorting with - * this compartor. + * Comparator for sorting DataNodeInfo[] based on + * decommissioned and entering_maintenance states. */ - public static final Comparator<DatanodeInfo> DECOM_COMPARATOR = - new Comparator<DatanodeInfo>() { - @Override - public int compare(DatanodeInfo a, DatanodeInfo b) { - return a.isDecommissioned() == b.isDecommissioned() ? 0 : - a.isDecommissioned() ? 1 : -1; + public static class ServiceComparator implements Comparator<DatanodeInfo> { + @Override + public int compare(DatanodeInfo a, DatanodeInfo b) { + // Decommissioned nodes will still be moved to the end of the list + if (a.isDecommissioned()) { + return b.isDecommissioned() ? 0 : 1; + } else if (b.isDecommissioned()) { + return -1; } - }; + // ENTERING_MAINTENANCE nodes should be after live nodes. + if (a.isEnteringMaintenance()) { + return b.isEnteringMaintenance() ? 0 : 1; + } else if (b.isEnteringMaintenance()) { + return -1; + } else { + return 0; + } + } + } /** - * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states. - * Decommissioned/stale nodes are moved to the end of the array on sorting - * with this comparator. - */ + * Comparator for sorting DataNodeInfo[] based on + * stale, decommissioned and entering_maintenance states. + * Order: live -> stale -> entering_maintenance -> decommissioned + */ @InterfaceAudience.Private - public static class DecomStaleComparator implements Comparator<DatanodeInfo> { + public static class ServiceAndStaleComparator extends ServiceComparator { private final long staleInterval; /** - * Constructor of DecomStaleComparator + * Constructor of ServiceAndStaleComparator * * @param interval * The time interval for marking datanodes as stale is passed from * outside, since the interval may be changed dynamically */ - public DecomStaleComparator(long interval) { + public ServiceAndStaleComparator(long interval) { this.staleInterval = interval; } @Override public int compare(DatanodeInfo a, DatanodeInfo b) { - // Decommissioned nodes will still be moved to the end of the list - if (a.isDecommissioned()) { - return b.isDecommissioned() ? 0 : 1; - } else if (b.isDecommissioned()) { - return -1; + int ret = super.compare(a, b); + if (ret != 0) { + return ret; } + // Stale nodes will be moved behind the normal nodes boolean aStale = a.isStale(staleInterval); boolean bStale = b.isStale(staleInterval); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index aea0ae4..e5c5e53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -989,20 +989,17 @@ public class Dispatcher { } private boolean shouldIgnore(DatanodeInfo dn) { - // ignore decommissioned nodes - final boolean decommissioned = dn.isDecommissioned(); - // ignore decommissioning nodes - final boolean decommissioning = dn.isDecommissionInProgress(); + // ignore out-of-service nodes + final boolean outOfService = !dn.isInService(); // ignore nodes in exclude list final boolean excluded = Util.isExcluded(excludedNodes, dn); // ignore nodes not in the include list (if include list is not empty) final boolean notIncluded = !Util.isIncluded(includedNodes, dn); - if (decommissioned || decommissioning || excluded || notIncluded) { + if (outOfService || excluded || notIncluded) { if (LOG.isTraceEnabled()) { LOG.trace("Excluding datanode " + dn - + ": decommissioned=" + decommissioned - + ", decommissioning=" + decommissioning + + ": outOfService=" + outOfService + ", excluded=" + excluded + ", notIncluded=" + notIncluded); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7b13add..03bdb7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -126,6 +126,29 @@ import org.slf4j.LoggerFactory; /** * Keeps information related to the blocks stored in the Hadoop cluster. + * For block state management, it tries to maintain the safety + * property of "# of live replicas == # of expected redundancy" under + * any events such as decommission, namenode failover, datanode failure. + * + * The motivation of maintenance mode is to allow admins quickly repair nodes + * without paying the cost of decommission. Thus with maintenance mode, + * # of live replicas doesn't have to be equal to # of expected redundancy. + * If any of the replica is in maintenance mode, the safety property + * is extended as follows. These property still apply for the case of zero + * maintenance replicas, thus we can use these safe property for all scenarios. + * a. # of live replicas >= # of min replication for maintenance. + * b. # of live replicas <= # of expected redundancy. + * c. # of live replicas and maintenance replicas >= # of expected redundancy. + * + * For regular replication, # of min live replicas for maintenance is determined + * by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <= + * DFS_NAMENODE_REPLICATION_MIN_KEY. + * For erasure encoding, # of min live replicas for maintenance is + * BlockInfoStriped#getRealDataBlockNum. + * + * Another safety property is to satisfy the block placement policy. While the + * policy is configurable, the replicas the policy is applied to are the live + * replicas + maintenance replicas. */ @InterfaceAudience.Private public class BlockManager implements BlockStatsMXBean { @@ -341,6 +364,11 @@ public class BlockManager implements BlockStatsMXBean { private final BlockIdManager blockIdManager; + /** Minimum live replicas needed for the datanode to be transitioned + * from ENTERING_MAINTENANCE to IN_MAINTENANCE. + */ + private final short minReplicationToBeInMaintenance; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -373,13 +401,13 @@ public class BlockManager implements BlockStatsMXBean { this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY, DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED); - this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, - DFSConfigKeys.DFS_REPLICATION_DEFAULT); + this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); - final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, - DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT); + final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, + DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT); final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); if (minR <= 0) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY @@ -407,7 +435,7 @@ public class BlockManager implements BlockStatsMXBean { this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); - this.replicationRecheckInterval = + this.replicationRecheckInterval = conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT, TimeUnit.SECONDS) * 1000L; @@ -428,7 +456,7 @@ public class BlockManager implements BlockStatsMXBean { this.encryptDataTransfer = conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); - + this.maxNumBlocksToLog = conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); @@ -438,6 +466,25 @@ public class BlockManager implements BlockStatsMXBean { this.getBlocksMinBlockSize = conf.getLongBytes( DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); + + final int minMaintenanceR = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY, + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT); + + if (minMaintenanceR < 0) { + throw new IOException("Unexpected configuration parameters: " + + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY + + " = " + minMaintenanceR + " < 0"); + } + if (minMaintenanceR > minR) { + throw new IOException("Unexpected configuration parameters: " + + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY + + " = " + minMaintenanceR + " > " + + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + + " = " + minR); + } + this.minReplicationToBeInMaintenance = (short)minMaintenanceR; + this.blockReportLeaseManager = new BlockReportLeaseManager(conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf); @@ -668,7 +715,7 @@ public class BlockManager implements BlockStatsMXBean { // Dump all datanodes getDatanodeManager().datanodeDump(out); } - + /** * Dump the metadata for the given block in a human-readable * form. @@ -697,12 +744,12 @@ public class BlockManager implements BlockStatsMXBean { out.print(fileName + ": "); } // l: == live:, d: == decommissioned c: == corrupt e: == excess - out.print(block + ((usableReplicas > 0)? "" : " MISSING") + + out.print(block + ((usableReplicas > 0)? "" : " MISSING") + " (replicas:" + " l: " + numReplicas.liveReplicas() + " d: " + numReplicas.decommissionedAndDecommissioning() + " c: " + numReplicas.corruptReplicas() + - " e: " + numReplicas.excessReplicas() + ") "); + " e: " + numReplicas.excessReplicas() + ") "); Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block); @@ -750,6 +797,18 @@ public class BlockManager implements BlockStatsMXBean { } } + public short getMinReplicationToBeInMaintenance() { + return minReplicationToBeInMaintenance; + } + + private short getMinMaintenanceStorageNum(BlockInfo block) { + if (block.isStriped()) { + return ((BlockInfoStriped) block).getRealDataBlockNum(); + } else { + return minReplicationToBeInMaintenance; + } + } + public boolean hasMinStorage(BlockInfo block) { return countNodes(block).liveReplicas() >= getMinStorageNum(block); } @@ -942,7 +1001,7 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas replicas = countNodes(lastBlock); neededReconstruction.remove(lastBlock, replicas.liveReplicas(), replicas.readOnlyReplicas(), - replicas.decommissionedAndDecommissioning(), getRedundancy(lastBlock)); + replicas.outOfServiceReplicas(), getExpectedRedundancyNum(lastBlock)); pendingReconstruction.remove(lastBlock); // remove this block from the list of pending blocks to be deleted. @@ -1078,7 +1137,8 @@ public class BlockManager implements BlockStatsMXBean { } else { isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes; } - final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; + int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; + numMachines -= numReplicas.maintenanceNotForReadReplicas(); DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null; int j = 0, i = 0; @@ -1086,11 +1146,17 @@ public class BlockManager implements BlockStatsMXBean { final boolean noCorrupt = (numCorruptReplicas == 0); for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { if (storage.getState() != State.FAILED) { + final DatanodeDescriptor d = storage.getDatanodeDescriptor(); + // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states. + if (d.isInMaintenance() + || (d.isEnteringMaintenance() && !d.isAlive())) { + continue; + } + if (noCorrupt) { machines[j++] = storage; i = setBlockIndices(blk, blockIndices, i, storage); } else { - final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = isReplicaCorrupt(blk, d); if (isCorrupt || !replicaCorrupt) { machines[j++] = storage; @@ -1106,7 +1172,7 @@ public class BlockManager implements BlockStatsMXBean { } assert j == machines.length : - "isCorrupt: " + isCorrupt + + "isCorrupt: " + isCorrupt + " numMachines: " + numMachines + " numNodes: " + numNodes + " numCorrupt: " + numCorruptNodes + @@ -1700,8 +1766,11 @@ public class BlockManager implements BlockStatsMXBean { return scheduledWork; } + // Check if the number of live + pending replicas satisfies + // the expected redundancy. boolean hasEnoughEffectiveReplicas(BlockInfo block, - NumberReplicas numReplicas, int pendingReplicaNum, int required) { + NumberReplicas numReplicas, int pendingReplicaNum) { + int required = getExpectedLiveRedundancyNum(block, numReplicas); int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; return (numEffectiveReplicas >= required) && (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); @@ -1716,8 +1785,6 @@ public class BlockManager implements BlockStatsMXBean { return null; } - short requiredRedundancy = getExpectedRedundancyNum(block); - // get a source data-node List<DatanodeDescriptor> containingNodes = new ArrayList<>(); List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); @@ -1726,6 +1793,8 @@ public class BlockManager implements BlockStatsMXBean { final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, containingNodes, liveReplicaNodes, numReplicas, liveBlockIndices, priority); + short requiredRedundancy = getExpectedLiveRedundancyNum(block, + numReplicas); if(srcNodes == null || srcNodes.length == 0) { // block can not be reconstructed from any node LOG.debug("Block " + block + " cannot be reconstructed " + @@ -1738,8 +1807,7 @@ public class BlockManager implements BlockStatsMXBean { assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); int pendingNum = pendingReconstruction.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, - requiredRedundancy)) { + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { neededReconstruction.remove(block, priority); blockLog.debug("BLOCK* Removing {} from neededReconstruction as" + " it has enough replicas", block); @@ -1763,9 +1831,11 @@ public class BlockManager implements BlockStatsMXBean { // should reconstruct all the internal blocks before scheduling // replication task for decommissioning node(s). - if (additionalReplRequired - numReplicas.decommissioning() > 0) { - additionalReplRequired = additionalReplRequired - - numReplicas.decommissioning(); + if (additionalReplRequired - numReplicas.decommissioning() - + numReplicas.liveEnteringMaintenanceReplicas() > 0) { + additionalReplRequired = additionalReplRequired - + numReplicas.decommissioning() - + numReplicas.liveEnteringMaintenanceReplicas(); } byte[] indices = new byte[liveBlockIndices.size()]; for (int i = 0 ; i < liveBlockIndices.size(); i++) { @@ -1807,11 +1877,11 @@ public class BlockManager implements BlockStatsMXBean { } // do not schedule more if enough replicas is already pending - final short requiredRedundancy = getExpectedRedundancyNum(block); NumberReplicas numReplicas = countNodes(block); + final short requiredRedundancy = + getExpectedLiveRedundancyNum(block, numReplicas); final int pendingNum = pendingReconstruction.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, - requiredRedundancy)) { + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { neededReconstruction.remove(block, priority); rw.resetTargets(); blockLog.debug("BLOCK* Removing {} from neededReplications as" + @@ -1880,7 +1950,7 @@ public class BlockManager implements BlockStatsMXBean { * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * Set, long, List, BlockStoragePolicy) + * Set, long, List, BlockStoragePolicy, EnumSet) */ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final int numOfReplicas, final Node client, @@ -1987,13 +2057,15 @@ public class BlockManager implements BlockStatsMXBean { continue; } - // never use already decommissioned nodes or unknown state replicas - if (state == null || state == StoredReplicaState.DECOMMISSIONED) { + // never use already decommissioned nodes, maintenance node not + // suitable for read or unknown state replicas. + if (state == null || state == StoredReplicaState.DECOMMISSIONED + || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) { continue; } if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY - && !node.isDecommissionInProgress() + && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance()) && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { continue; // already reached replication limit } @@ -2045,10 +2117,10 @@ public class BlockManager implements BlockStatsMXBean { continue; } NumberReplicas num = countNodes(timedOutItems[i]); - if (isNeededReconstruction(bi, num.liveReplicas())) { + if (isNeededReconstruction(bi, num)) { neededReconstruction.add(bi, num.liveReplicas(), - num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), - getRedundancy(bi)); + num.readOnlyReplicas(), num.outOfServiceReplicas(), + getExpectedRedundancyNum(bi)); } } } finally { @@ -3014,10 +3086,9 @@ public class BlockManager implements BlockStatsMXBean { // handle low redundancy/extra redundancy short fileRedundancy = getExpectedRedundancyNum(storedBlock); - if (!isNeededReconstruction(storedBlock, numCurrentReplica)) { + if (!isNeededReconstruction(storedBlock, num, pendingNum)) { neededReconstruction.remove(storedBlock, numCurrentReplica, - num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), fileRedundancy); + num.readOnlyReplicas(), num.outOfServiceReplicas(), fileRedundancy); } else { updateNeededReconstructions(storedBlock, curReplicaDelta, 0); } @@ -3040,6 +3111,10 @@ public class BlockManager implements BlockStatsMXBean { return storedBlock; } + // If there is any maintenance replica, we don't have to restore + // the condition of live + maintenance == expected. We allow + // live + maintenance >= expected. The extra redundancy will be removed + // when the maintenance node changes to live. private boolean shouldProcessExtraRedundancy(NumberReplicas num, int expectedNum) { final int numCurrent = num.liveReplicas(); @@ -3255,9 +3330,9 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas num = countNodes(block); final int numCurrentReplica = num.liveReplicas(); // add to low redundancy queue if need to be - if (isNeededReconstruction(block, numCurrentReplica)) { + if (isNeededReconstruction(block, num)) { if (neededReconstruction.add(block, numCurrentReplica, - num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), + num.readOnlyReplicas(), num.outOfServiceReplicas(), expectedRedundancy)) { return MisReplicationResult.UNDER_REPLICATED; } @@ -3290,9 +3365,9 @@ public class BlockManager implements BlockStatsMXBean { // update neededReconstruction priority queues b.setReplication(newRepl); + NumberReplicas num = countNodes(b); updateNeededReconstructions(b, 0, newRepl - oldRepl); - - if (oldRepl > newRepl) { + if (shouldProcessExtraRedundancy(num, newRepl)) { processExtraRedundancyBlock(b, newRepl, null, null); } } @@ -3318,14 +3393,14 @@ public class BlockManager implements BlockStatsMXBean { } final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (storage.areBlockContentsStale()) { - LOG.trace("BLOCK* processOverReplicatedBlock: Postponing {}" + LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}" + " since storage {} does not yet have up-to-date information.", block, storage); postponeBlock(block); return; } if (!isExcess(cur, block)) { - if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { + if (cur.isInService()) { // exclude corrupt replicas if (corruptNodes == null || !corruptNodes.contains(cur)) { nonExcess.add(storage); @@ -3766,7 +3841,7 @@ public class BlockManager implements BlockStatsMXBean { return countNodes(b, false); } - private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) { + NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) { NumberReplicas numberReplicas = new NumberReplicas(); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); if (b.isStriped()) { @@ -3797,6 +3872,12 @@ public class BlockManager implements BlockStatsMXBean { s = StoredReplicaState.DECOMMISSIONING; } else if (node.isDecommissioned()) { s = StoredReplicaState.DECOMMISSIONED; + } else if (node.isMaintenance()) { + if (node.isInMaintenance() || !node.isAlive()) { + s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ; + } else { + s = StoredReplicaState.MAINTENANCE_FOR_READ; + } } else if (isExcess(node, b)) { s = StoredReplicaState.EXCESS; } else { @@ -3868,11 +3949,11 @@ public class BlockManager implements BlockStatsMXBean { } /** - * On stopping decommission, check if the node has excess replicas. + * On putting the node in service, check if the node has excess replicas. * If there are any excess replicas, call processExtraRedundancyBlock(). * Process extra redundancy blocks only when active NN is out of safe mode. */ - void processExtraRedundancyBlocksOnReCommission( + void processExtraRedundancyBlocksOnInService( final DatanodeDescriptor srcNode) { if (!isPopulatingReplQueues()) { return; @@ -3881,7 +3962,7 @@ public class BlockManager implements BlockStatsMXBean { int numExtraRedundancy = 0; while(it.hasNext()) { final BlockInfo block = it.next(); - int expectedReplication = this.getRedundancy(block); + int expectedReplication = this.getExpectedRedundancyNum(block); NumberReplicas num = countNodes(block); if (shouldProcessExtraRedundancy(num, expectedReplication)) { // extra redundancy block @@ -3891,14 +3972,15 @@ public class BlockManager implements BlockStatsMXBean { } } LOG.info("Invalidated " + numExtraRedundancy - + " extra redundancy blocks on " + srcNode + " during recommissioning"); + + " extra redundancy blocks on " + srcNode + " after it is in service"); } /** - * Returns whether a node can be safely decommissioned based on its - * liveness. Dead nodes cannot always be safely decommissioned. + * Returns whether a node can be safely decommissioned or in maintenance + * based on its liveness. Dead nodes cannot always be safely decommissioned + * or in maintenance. */ - boolean isNodeHealthyForDecommission(DatanodeDescriptor node) { + boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) { if (!node.checkBlockReportReceived()) { LOG.info("Node {} hasn't sent its first block report.", node); return false; @@ -3912,17 +3994,18 @@ public class BlockManager implements BlockStatsMXBean { if (pendingReconstructionBlocksCount == 0 && lowRedundancyBlocksCount == 0) { LOG.info("Node {} is dead and there are no low redundancy" + - " blocks or blocks pending reconstruction. Safe to decommission.", - node); + " blocks or blocks pending reconstruction. Safe to decommission or", + " put in maintenance.", node); return true; } LOG.warn("Node {} is dead " + - "while decommission is in progress. Cannot be safely " + - "decommissioned since there is risk of reduced " + - "data durability or data loss. Either restart the failed node or" + - " force decommissioning by removing, calling refreshNodes, " + - "then re-adding to the excludes files.", node); + "while in {}. Cannot be safely " + + "decommissioned or be in maintenance since there is risk of reduced " + + "data durability or data loss. Either restart the failed node or " + + "force decommissioning or maintenance by removing, calling " + + "refreshNodes, then re-adding to the excludes or host config files.", + node, node.getAdminState()); return false; } @@ -3990,17 +4073,16 @@ public class BlockManager implements BlockStatsMXBean { } NumberReplicas repl = countNodes(block); int pendingNum = pendingReconstruction.getNumReplicas(block); - int curExpectedReplicas = getRedundancy(block); - if (!hasEnoughEffectiveReplicas(block, repl, pendingNum, - curExpectedReplicas)) { + int curExpectedReplicas = getExpectedRedundancyNum(block); + if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) { neededReconstruction.update(block, repl.liveReplicas() + pendingNum, - repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), + repl.readOnlyReplicas(), repl.outOfServiceReplicas(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(), - repl.decommissionedAndDecommissioning(), oldExpectedReplicas); + repl.outOfServiceReplicas(), oldExpectedReplicas); } } finally { namesystem.writeUnlock(); @@ -4018,24 +4100,15 @@ public class BlockManager implements BlockStatsMXBean { short expected = getExpectedRedundancyNum(block); final NumberReplicas n = countNodes(block); final int pending = pendingReconstruction.getNumReplicas(block); - if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { + if (!hasEnoughEffectiveReplicas(block, n, pending)) { neededReconstruction.add(block, n.liveReplicas() + pending, - n.readOnlyReplicas(), - n.decommissionedAndDecommissioning(), expected); + n.readOnlyReplicas(), n.outOfServiceReplicas(), expected); } else if (shouldProcessExtraRedundancy(n, expected)) { processExtraRedundancyBlock(block, expected, null, null); } } } - /** - * @return 0 if the block is not found; - * otherwise, return the replication factor of the block. - */ - private int getRedundancy(BlockInfo block) { - return getExpectedRedundancyNum(block); - } - /** * Get blocks to invalidate for <i>nodeId</i> * in {@link #invalidateBlocks}. @@ -4088,6 +4161,8 @@ public class BlockManager implements BlockStatsMXBean { .getNodes(storedBlock); for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); + // Nodes under maintenance should be counted as valid replicas from + // rack policy point of view. if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() && ((corruptNodes == null) || !corruptNodes.contains(cur))) { liveNodes.add(cur); @@ -4102,14 +4177,36 @@ public class BlockManager implements BlockStatsMXBean { .isPlacementPolicySatisfied(); } + boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock, + NumberReplicas numberReplicas) { + return storedBlock.isComplete() && (numberReplicas.liveReplicas() < + getMinMaintenanceStorageNum(storedBlock) || + !isPlacementPolicySatisfied(storedBlock)); + } + + boolean isNeededReconstruction(BlockInfo storedBlock, + NumberReplicas numberReplicas) { + return isNeededReconstruction(storedBlock, numberReplicas, 0); + } + /** * A block needs reconstruction if the number of redundancies is less than * expected or if it does not have enough racks. */ - boolean isNeededReconstruction(BlockInfo storedBlock, int current) { - int expected = getExpectedRedundancyNum(storedBlock); - return storedBlock.isComplete() - && (current < expected || !isPlacementPolicySatisfied(storedBlock)); + boolean isNeededReconstruction(BlockInfo storedBlock, + NumberReplicas numberReplicas, int pending) { + return storedBlock.isComplete() && + !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending); + } + + // Exclude maintenance, but make sure it has minimal live replicas + // to satisfy the maintenance requirement. + public short getExpectedLiveRedundancyNum(BlockInfo block, + NumberReplicas numberReplicas) { + final short expectedRedundancy = getExpectedRedundancyNum(block); + return (short)Math.max(expectedRedundancy - + numberReplicas.maintenanceReplicas(), + getMinMaintenanceStorageNum(block)); } public short getExpectedRedundancyNum(BlockInfo block) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 3958c73..0390546 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -833,8 +833,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { List<DatanodeStorageInfo> results, boolean avoidStaleNodes) { // check if the node is (being) decommissioned - if (node.isDecommissionInProgress() || node.isDecommissioned()) { - logNodeIsNotChosen(node, "the node is (being) decommissioned "); + if (!node.isInService()) { + logNodeIsNotChosen(node, "the node isn't in service."); return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index ca8d72a..8563cf3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -682,7 +682,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { if (datanode == null) { continue; } - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { + if (!datanode.isInService()) { continue; } if (corrupt != null && corrupt.contains(datanode)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 6d163ec..f7da52a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -146,8 +146,8 @@ public class DatanodeDescriptor extends DatanodeInfo { // Stores status of decommissioning. // If node is not decommissioning, do not use this object for anything. - public final DecommissioningStatus decommissioningStatus = - new DecommissioningStatus(); + private final LeavingServiceStatus leavingServiceStatus = + new LeavingServiceStatus(); private final Map<String, DatanodeStorageInfo> storageMap = new HashMap<>(); @@ -276,6 +276,10 @@ public class DatanodeDescriptor extends DatanodeInfo { this.needKeyUpdate = needKeyUpdate; } + public LeavingServiceStatus getLeavingServiceStatus() { + return leavingServiceStatus; + } + @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { @@ -729,51 +733,54 @@ public class DatanodeDescriptor extends DatanodeInfo { return (this == obj) || super.equals(obj); } - /** Decommissioning status */ - public class DecommissioningStatus { + /** Leaving service status. */ + public class LeavingServiceStatus { private int underReplicatedBlocks; - private int decommissionOnlyReplicas; + private int outOfServiceOnlyReplicas; private int underReplicatedInOpenFiles; private long startTime; synchronized void set(int underRep, int onlyRep, int underConstruction) { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return; } underReplicatedBlocks = underRep; - decommissionOnlyReplicas = onlyRep; + outOfServiceOnlyReplicas = onlyRep; underReplicatedInOpenFiles = underConstruction; } /** @return the number of under-replicated blocks */ public synchronized int getUnderReplicatedBlocks() { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } return underReplicatedBlocks; } - /** @return the number of decommission-only replicas */ - public synchronized int getDecommissionOnlyReplicas() { - if (!isDecommissionInProgress()) { + /** @return the number of blocks with out-of-service-only replicas */ + public synchronized int getOutOfServiceOnlyReplicas() { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } - return decommissionOnlyReplicas; + return outOfServiceOnlyReplicas; } /** @return the number of under-replicated blocks in open files */ public synchronized int getUnderReplicatedInOpenFiles() { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } return underReplicatedInOpenFiles; } /** Set start time */ public synchronized void setStartTime(long time) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { + return; + } startTime = time; } /** @return start time */ public synchronized long getStartTime() { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } return startTime; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2d6547f..1a47835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -388,8 +388,8 @@ public class DatanodeManager { public void sortLocatedBlocks(final String targetHost, final List<LocatedBlock> locatedBlocks) { Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ? - new DFSUtil.DecomStaleComparator(staleInterval) : - DFSUtil.DECOM_COMPARATOR; + new DFSUtil.ServiceAndStaleComparator(staleInterval) : + new DFSUtil.ServiceComparator(); // sort located block for (LocatedBlock lb : locatedBlocks) { if (lb.isStriped()) { @@ -632,9 +632,20 @@ public class DatanodeManager { * @param nodeInfo datanode descriptor. */ private void removeDatanode(DatanodeDescriptor nodeInfo) { + removeDatanode(nodeInfo, true); + } + + /** + * Remove a datanode descriptor. + * @param nodeInfo datanode descriptor. + */ + private void removeDatanode(DatanodeDescriptor nodeInfo, + boolean removeBlocksFromBlocksMap) { assert namesystem.hasWriteLock(); heartbeatManager.removeDatanode(nodeInfo); - blockManager.removeBlocksAssociatedTo(nodeInfo); + if (removeBlocksFromBlocksMap) { + blockManager.removeBlocksAssociatedTo(nodeInfo); + } networktopology.remove(nodeInfo); decrementVersionCount(nodeInfo.getSoftwareVersion()); blockManager.getBlockReportLeaseManager().unregister(nodeInfo); @@ -655,7 +666,7 @@ public class DatanodeManager { try { final DatanodeDescriptor descriptor = getDatanode(node); if (descriptor != null) { - removeDatanode(descriptor); + removeDatanode(descriptor, true); } else { NameNode.stateChangeLog.warn("BLOCK* removeDatanode: " + node + " does not exist"); @@ -666,7 +677,8 @@ public class DatanodeManager { } /** Remove a dead datanode. */ - void removeDeadDatanode(final DatanodeID nodeID) { + void removeDeadDatanode(final DatanodeID nodeID, + boolean removeBlocksFromBlockMap) { DatanodeDescriptor d; try { d = getDatanode(nodeID); @@ -675,8 +687,9 @@ public class DatanodeManager { } if (d != null && isDatanodeDead(d)) { NameNode.stateChangeLog.info( - "BLOCK* removeDeadDatanode: lost heartbeat from " + d); - removeDatanode(d); + "BLOCK* removeDeadDatanode: lost heartbeat from " + d + + ", removeBlocksFromBlockMap " + removeBlocksFromBlockMap); + removeDatanode(d, removeBlocksFromBlockMap); } } @@ -1112,10 +1125,16 @@ public class DatanodeManager { } /** - * 1. Added to hosts --> no further work needed here. - * 2. Removed from hosts --> mark AdminState as decommissioned. - * 3. Added to exclude --> start decommission. - * 4. Removed from exclude --> stop decommission. + * Reload datanode membership and the desired admin operations from + * host files. If a node isn't allowed, hostConfigManager.isIncluded returns + * false and the node can't be used. + * If a node is allowed and the desired admin operation is defined, + * it will transition to the desired admin state. + * If a node is allowed and upgrade domain is defined, + * the upgrade domain will be set on the node. + * To use maintenance mode or upgrade domain, set + * DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY to + * CombinedHostFileManager.class. */ private void refreshDatanodes() { final Map<String, DatanodeDescriptor> copy; @@ -1125,17 +1144,17 @@ public class DatanodeManager { for (DatanodeDescriptor node : copy.values()) { // Check if not include. if (!hostConfigManager.isIncluded(node)) { - node.setDisallowed(true); // case 2. + node.setDisallowed(true); } else { long maintenanceExpireTimeInMS = hostConfigManager.getMaintenanceExpirationTimeInMS(node); if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) { decomManager.startMaintenance(node, maintenanceExpireTimeInMS); } else if (hostConfigManager.isExcluded(node)) { - decomManager.startDecommission(node); // case 3. + decomManager.startDecommission(node); } else { decomManager.stopMaintenance(node); - decomManager.stopDecommission(node); // case 4. + decomManager.stopDecommission(node); } } node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 87b36da..b1cfd78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -201,7 +201,7 @@ public class DecommissionManager { LOG.info("Starting decommission of {} {} with {} blocks", node, storage, storage.numBlocks()); } - node.decommissioningStatus.setStartTime(monotonicNow()); + node.getLeavingServiceStatus().setStartTime(monotonicNow()); pendingNodes.add(node); } } else { @@ -222,7 +222,7 @@ public class DecommissionManager { // extra redundancy blocks will be detected and processed when // the dead node comes back and send in its full block report. if (node.isAlive()) { - blockManager.processExtraRedundancyBlocksOnReCommission(node); + blockManager.processExtraRedundancyBlocksOnInService(node); } // Remove from tracking in DecommissionManager pendingNodes.remove(node); @@ -246,6 +246,16 @@ public class DecommissionManager { if (!node.isMaintenance()) { // Update DN stats maintained by HeartbeatManager hbManager.startMaintenance(node); + // hbManager.startMaintenance will set dead node to IN_MAINTENANCE. + if (node.isEnteringMaintenance()) { + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + LOG.info("Starting maintenance of {} {} with {} blocks", + node, storage, storage.numBlocks()); + } + node.getLeavingServiceStatus().setStartTime(monotonicNow()); + } + // Track the node regardless whether it is ENTERING_MAINTENANCE or + // IN_MAINTENANCE to support maintenance expiration. pendingNodes.add(node); } else { LOG.trace("startMaintenance: Node {} in {}, nothing to do." + @@ -264,8 +274,34 @@ public class DecommissionManager { // Update DN stats maintained by HeartbeatManager hbManager.stopMaintenance(node); - // TODO HDFS-9390 remove replicas from block maps - // or handle over replicated blocks. + // extra redundancy blocks will be detected and processed when + // the dead node comes back and send in its full block report. + if (!node.isAlive()) { + // The node became dead when it was in maintenance, at which point + // the replicas weren't removed from block maps. + // When the node leaves maintenance, the replicas should be removed + // from the block maps to trigger the necessary replication to + // maintain the safety property of "# of live replicas + maintenance + // replicas" >= the expected redundancy. + blockManager.removeBlocksAssociatedTo(node); + } else { + // Even though putting nodes in maintenance node doesn't cause live + // replicas to match expected replication factor, it is still possible + // to have over replicated when the node leaves maintenance node. + // First scenario: + // a. Node became dead when it is at AdminStates.NORMAL, thus + // block is replicated so that 3 replicas exist on other nodes. + // b. Admins put the dead node into maintenance mode and then + // have the node rejoin the cluster. + // c. Take the node out of maintenance mode. + // Second scenario: + // a. With replication factor 3, set one replica to maintenance node, + // thus block has 1 maintenance replica and 2 live replicas. + // b. Change the replication factor to 2. The block will still have + // 1 maintenance replica and 2 live replicas. + // c. Take the node out of maintenance mode. + blockManager.processExtraRedundancyBlocksOnInService(node); + } // Remove from tracking in DecommissionManager pendingNodes.remove(node); @@ -281,6 +317,11 @@ public class DecommissionManager { LOG.info("Decommissioning complete for node {}", dn); } + private void setInMaintenance(DatanodeDescriptor dn) { + dn.setInMaintenance(); + LOG.info("Node {} has entered maintenance mode.", dn); + } + /** * Checks whether a block is sufficiently replicated/stored for * decommissioning. For replicated blocks or striped blocks, full-strength @@ -288,20 +329,21 @@ public class DecommissionManager { * @return true if sufficient, else false. */ private boolean isSufficient(BlockInfo block, BlockCollection bc, - NumberReplicas numberReplicas) { - final int numExpected = blockManager.getExpectedRedundancyNum(block); - final int numLive = numberReplicas.liveReplicas(); - if (numLive >= numExpected - && blockManager.isPlacementPolicySatisfied(block)) { + NumberReplicas numberReplicas, boolean isDecommission) { + if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) { // Block has enough replica, skip LOG.trace("Block {} does not need replication.", block); return true; } + final int numExpected = blockManager.getExpectedLiveRedundancyNum(block, + numberReplicas); + final int numLive = numberReplicas.liveReplicas(); + // Block is under-replicated - LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, + LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, numLive); - if (numExpected > numLive) { + if (isDecommission && numExpected > numLive) { if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) { // Can decom a UC block as long as there will still be minReplicas if (blockManager.hasMinStorage(block, numLive)) { @@ -346,11 +388,16 @@ public class DecommissionManager { + ", corrupt replicas: " + num.corruptReplicas() + ", decommissioned replicas: " + num.decommissioned() + ", decommissioning replicas: " + num.decommissioning() + + ", maintenance replicas: " + num.maintenanceReplicas() + + ", live entering maintenance replicas: " + + num.liveEnteringMaintenanceReplicas() + ", excess replicas: " + num.excessReplicas() + ", Is Open File: " + bc.isUnderConstruction() + ", Datanodes having this block: " + nodeList + ", Current Datanode: " + srcNode + ", Is current datanode decommissioning: " - + srcNode.isDecommissionInProgress()); + + srcNode.isDecommissionInProgress() + + ", Is current datanode entering maintenance: " + + srcNode.isEnteringMaintenance()); } @VisibleForTesting @@ -424,7 +471,7 @@ public class DecommissionManager { numBlocksChecked = 0; numBlocksCheckedPerLock = 0; numNodesChecked = 0; - // Check decom progress + // Check decommission or maintenance progress. namesystem.writeLock(); try { processPendingNodes(); @@ -464,15 +511,14 @@ public class DecommissionManager { final DatanodeDescriptor dn = entry.getKey(); AbstractList<BlockInfo> blocks = entry.getValue(); boolean fullScan = false; - if (dn.isMaintenance()) { - // TODO HDFS-9390 make sure blocks are minimally replicated - // before transitioning the node to IN_MAINTENANCE state. - + if (dn.isMaintenance() && dn.maintenanceExpired()) { // If maintenance expires, stop tracking it. - if (dn.maintenanceExpired()) { - stopMaintenance(dn); - toRemove.add(dn); - } + stopMaintenance(dn); + toRemove.add(dn); + continue; + } + if (dn.isInMaintenance()) { + // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet. continue; } if (blocks == null) { @@ -487,7 +533,7 @@ public class DecommissionManager { } else { // This is a known datanode, check if its # of insufficiently // replicated blocks has dropped to zero and if it can be decommed - LOG.debug("Processing decommission-in-progress node {}", dn); + LOG.debug("Processing {} node {}", dn.getAdminState(), dn); pruneReliableBlocks(dn, blocks); } if (blocks.size() == 0) { @@ -506,22 +552,31 @@ public class DecommissionManager { // If the full scan is clean AND the node liveness is okay, // we can finally mark as decommissioned. final boolean isHealthy = - blockManager.isNodeHealthyForDecommission(dn); + blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); if (blocks.size() == 0 && isHealthy) { - setDecommissioned(dn); - toRemove.add(dn); + if (dn.isDecommissionInProgress()) { + setDecommissioned(dn); + toRemove.add(dn); + } else if (dn.isEnteringMaintenance()) { + // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to + // to track maintenance expiration. + setInMaintenance(dn); + } else { + Preconditions.checkState(false, + "A node is in an invalid state!"); + } LOG.debug("Node {} is sufficiently replicated and healthy, " - + "marked as decommissioned.", dn); + + "marked as {}.", dn.getAdminState()); } else { LOG.debug("Node {} {} healthy." + " It needs to replicate {} more blocks." - + " Decommissioning is still in progress.", - dn, isHealthy? "is": "isn't", blocks.size()); + + " {} is still in progress.", dn, + isHealthy? "is": "isn't", blocks.size(), dn.getAdminState()); } } else { LOG.debug("Node {} still has {} blocks to replicate " - + "before it is a candidate to finish decommissioning.", - dn, blocks.size()); + + "before it is a candidate to finish {}.", + dn, blocks.size(), dn.getAdminState()); } iterkey = dn; } @@ -539,7 +594,7 @@ public class DecommissionManager { */ private void pruneReliableBlocks(final DatanodeDescriptor datanode, AbstractList<BlockInfo> blocks) { - processBlocksForDecomInternal(datanode, blocks.iterator(), null, true); + processBlocksInternal(datanode, blocks.iterator(), null, true); } /** @@ -554,7 +609,7 @@ public class DecommissionManager { private AbstractList<BlockInfo> handleInsufficientlyStored( final DatanodeDescriptor datanode) { AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>(); - processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), + processBlocksInternal(datanode, datanode.getBlockIterator(), insufficient, false); return insufficient; } @@ -573,14 +628,14 @@ public class DecommissionManager { * @param pruneReliableBlocks whether to remove blocks reliable * enough from the iterator */ - private void processBlocksForDecomInternal( + private void processBlocksInternal( final DatanodeDescriptor datanode, final Iterator<BlockInfo> it, final List<BlockInfo> insufficientList, boolean pruneReliableBlocks) { boolean firstReplicationLog = true; int lowRedundancyBlocks = 0; - int decommissionOnlyReplicas = 0; + int outOfServiceOnlyReplicas = 0; int lowRedundancyInOpenFiles = 0; while (it.hasNext()) { if (insufficientList == null @@ -626,21 +681,25 @@ public class DecommissionManager { // Schedule low redundancy blocks for reconstruction if not already // pending - if (blockManager.isNeededReconstruction(block, liveReplicas)) { + boolean isDecommission = datanode.isDecommissionInProgress(); + boolean neededReconstruction = isDecommission ? + blockManager.isNeededReconstruction(block, num) : + blockManager.isNeededReconstructionForMaintenance(block, num); + if (neededReconstruction) { if (!blockManager.neededReconstruction.contains(block) && blockManager.pendingReconstruction.getNumReplicas(block) == 0 && blockManager.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReconstruction.add(block, liveReplicas, num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), + num.outOfServiceReplicas(), blockManager.getExpectedRedundancyNum(block)); } } // Even if the block is without sufficient redundancy, // it doesn't block decommission if has sufficient redundancy - if (isSufficient(block, bc, num)) { + if (isSufficient(block, bc, num, isDecommission)) { if (pruneReliableBlocks) { it.remove(); } @@ -662,14 +721,13 @@ public class DecommissionManager { if (bc.isUnderConstruction()) { lowRedundancyInOpenFiles++; } - if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { - decommissionOnlyReplicas++; + if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) { + outOfServiceOnlyReplicas++; } } - datanode.decommissioningStatus.set(lowRedundancyBlocks, - decommissionOnlyReplicas, - lowRedundancyInOpenFiles); + datanode.getLeavingServiceStatus().set(lowRedundancyBlocks, + outOfServiceOnlyReplicas, lowRedundancyInOpenFiles); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 082e949..0ae6f0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -130,12 +130,14 @@ class ErasureCodingWork extends BlockReconstructionWork { // we only need to replicate one internal block to a new rack int sourceIndex = chooseSource4SimpleReplication(); createReplicationWork(sourceIndex, targets[0]); - } else if (numberReplicas.decommissioning() > 0 && hasAllInternalBlocks()) { - List<Integer> decommissioningSources = findDecommissioningSources(); + } else if ((numberReplicas.decommissioning() > 0 || + numberReplicas.liveEnteringMaintenanceReplicas() > 0) && + hasAllInternalBlocks()) { + List<Integer> leavingServiceSources = findLeavingServiceSources(); // decommissioningSources.size() should be >= targets.length - final int num = Math.min(decommissioningSources.size(), targets.length); + final int num = Math.min(leavingServiceSources.size(), targets.length); for (int i = 0; i < num; i++) { - createReplicationWork(decommissioningSources.get(i), targets[i]); + createReplicationWork(leavingServiceSources.get(i), targets[i]); } } else { targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( @@ -162,10 +164,12 @@ class ErasureCodingWork extends BlockReconstructionWork { } } - private List<Integer> findDecommissioningSources() { + private List<Integer> findLeavingServiceSources() { List<Integer> srcIndices = new ArrayList<>(); for (int i = 0; i < getSrcNodes().length; i++) { - if (getSrcNodes()[i].isDecommissionInProgress()) { + if (getSrcNodes()[i].isDecommissionInProgress() || + (getSrcNodes()[i].isEnteringMaintenance() && + getSrcNodes()[i].isAlive())) { srcIndices.add(i); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index d728ee2..a72ad64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -25,10 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.util.Daemon; @@ -269,13 +266,19 @@ class HeartbeatManager implements DatanodeStatistics { if (!node.isAlive()) { LOG.info("Dead node {} is put in maintenance state immediately.", node); node.setInMaintenance(); - } else if (node.isDecommissioned()) { - LOG.info("Decommissioned node " + node + " is put in maintenance state" - + " immediately."); - node.setInMaintenance(); } else { stats.subtract(node); - node.startMaintenance(); + if (node.isDecommissioned()) { + LOG.info("Decommissioned node " + node + " is put in maintenance state" + + " immediately."); + node.setInMaintenance(); + } else if (blockManager.getMinReplicationToBeInMaintenance() == 0) { + LOG.info("MinReplicationToBeInMaintenance is set to zero. " + node + + " is put in maintenance state" + " immediately."); + node.setInMaintenance(); + } else { + node.startMaintenance(); + } stats.add(node); } } @@ -352,7 +355,7 @@ class HeartbeatManager implements DatanodeStatistics { boolean allAlive = false; while (!allAlive) { // locate the first dead node. - DatanodeID dead = null; + DatanodeDescriptor dead = null; // locate the first failed storage that isn't on a dead node. DatanodeStorageInfo failedStorage = null; @@ -401,7 +404,7 @@ class HeartbeatManager implements DatanodeStatistics { // acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock(); try { - dm.removeDeadDatanode(dead); + dm.removeDeadDatanode(dead, !dead.isMaintenance()); } finally { namesystem.writeUnlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java index de8cf4e..3a26f4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -155,7 +155,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { private int getPriority(BlockInfo block, int curReplicas, int readOnlyReplicas, - int decommissionedReplicas, + int outOfServiceReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; if (curReplicas >= expectedReplicas) { @@ -164,20 +164,20 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { } if (block.isStriped()) { BlockInfoStriped sblk = (BlockInfoStriped) block; - return getPriorityStriped(curReplicas, decommissionedReplicas, + return getPriorityStriped(curReplicas, outOfServiceReplicas, sblk.getRealDataBlockNum(), sblk.getParityBlockNum()); } else { return getPriorityContiguous(curReplicas, readOnlyReplicas, - decommissionedReplicas, expectedReplicas); + outOfServiceReplicas, expectedReplicas); } } private int getPriorityContiguous(int curReplicas, int readOnlyReplicas, - int decommissionedReplicas, int expectedReplicas) { + int outOfServiceReplicas, int expectedReplicas) { if (curReplicas == 0) { // If there are zero non-decommissioned replicas but there are - // some decommissioned replicas, then assign them highest priority - if (decommissionedReplicas > 0) { + // some out of service replicas, then assign them highest priority + if (outOfServiceReplicas > 0) { return QUEUE_HIGHEST_PRIORITY; } if (readOnlyReplicas > 0) { @@ -201,11 +201,11 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { } } - private int getPriorityStriped(int curReplicas, int decommissionedReplicas, + private int getPriorityStriped(int curReplicas, int outOfServiceReplicas, short dataBlkNum, short parityBlkNum) { if (curReplicas < dataBlkNum) { // There are some replicas on decommissioned nodes so it's not corrupted - if (curReplicas + decommissionedReplicas >= dataBlkNum) { + if (curReplicas + outOfServiceReplicas >= dataBlkNum) { return QUEUE_HIGHEST_PRIORITY; } return QUEUE_WITH_CORRUPT_BLOCKS; @@ -227,18 +227,15 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { * * @param block a low redundancy block * @param curReplicas current number of replicas of the block - * @param decomissionedReplicas the number of decommissioned replicas + * @param outOfServiceReplicas the number of out-of-service replicas * @param expectedReplicas expected number of replicas of the block * @return true if the block was added to a queue. */ synchronized boolean add(BlockInfo block, - int curReplicas, - int readOnlyReplicas, - int decomissionedReplicas, - int expectedReplicas) { - assert curReplicas >= 0 : "Negative replicas!"; + int curReplicas, int readOnlyReplicas, + int outOfServiceReplicas, int expectedReplicas) { final int priLevel = getPriority(block, curReplicas, readOnlyReplicas, - decomissionedReplicas, expectedReplicas); + outOfServiceReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && expectedReplicas == 1) { @@ -257,12 +254,10 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { /** Remove a block from a low redundancy queue. */ synchronized boolean remove(BlockInfo block, - int oldReplicas, - int oldReadOnlyReplicas, - int decommissionedReplicas, - int oldExpectedReplicas) { + int oldReplicas, int oldReadOnlyReplicas, + int outOfServiceReplicas, int oldExpectedReplicas) { final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas, - decommissionedReplicas, oldExpectedReplicas); + outOfServiceReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && oldExpectedReplicas == 1 && @@ -325,22 +320,22 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> { * method call. * @param block a low redundancy block * @param curReplicas current number of replicas of the block - * @param decommissionedReplicas the number of decommissioned replicas + * @param outOfServiceReplicas the number of out-of-service replicas * @param curExpectedReplicas expected number of replicas of the block * @param curReplicasDelta the change in the replicate count from before * @param expectedReplicasDelta the change in the expected replica count * from before */ synchronized void update(BlockInfo block, int curReplicas, - int readOnlyReplicas, int decommissionedReplicas, - int curExpectedReplicas, - int curReplicasDelta, int expectedReplicasDelta) { + int readOnlyReplicas, int outOfServiceReplicas, + int curExpectedReplicas, + int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int curPri = getPriority(block, curReplicas, readOnlyReplicas, - decommissionedReplicas, curExpectedReplicas); + outOfServiceReplicas, curExpectedReplicas); int oldPri = getPriority(block, oldReplicas, readOnlyReplicas, - decommissionedReplicas, oldExpectedReplicas); + outOfServiceReplicas, oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " + block + http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java index 0198bcc..be984f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java @@ -24,9 +24,11 @@ import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.Store import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING; import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.EXCESS; import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_FOR_READ; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_NOT_FOR_READ; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY; import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.REDUNDANT; import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.STALESTORAGE; -import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY; /** * A immutable object that stores the number of live replicas and @@ -41,6 +43,14 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta READONLY, DECOMMISSIONING, DECOMMISSIONED, + // We need live ENTERING_MAINTENANCE nodes to continue + // to serve read request while it is being transitioned to live + // IN_MAINTENANCE if these are the only replicas left. + // MAINTENANCE_NOT_FOR_READ == maintenanceReplicas - + // Live ENTERING_MAINTENANCE. + MAINTENANCE_NOT_FOR_READ, + // Live ENTERING_MAINTENANCE nodes to serve read requests. + MAINTENANCE_FOR_READ, CORRUPT, // excess replicas already tracked by blockmanager's excess map EXCESS, @@ -106,4 +116,20 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta public int redundantInternalBlocks() { return (int) get(REDUNDANT); } -} + + public int maintenanceNotForReadReplicas() { + return (int) get(MAINTENANCE_NOT_FOR_READ); + } + + public int maintenanceReplicas() { + return (int) (get(MAINTENANCE_NOT_FOR_READ) + get(MAINTENANCE_FOR_READ)); + } + + public int outOfServiceReplicas() { + return maintenanceReplicas() + decommissionedAndDecommissioning(); + } + + public int liveEnteringMaintenanceReplicas() { + return (int)get(MAINTENANCE_FOR_READ); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java index 45dcc8d..005e6d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java @@ -81,7 +81,7 @@ public class StorageTypeStats { final DatanodeDescriptor node) { capacityUsed += info.getDfsUsed(); blockPoolUsed += info.getBlockPoolUsed(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityTotal += info.getCapacity(); capacityRemaining += info.getRemaining(); } else { @@ -90,7 +90,7 @@ public class StorageTypeStats { } void addNode(final DatanodeDescriptor node) { - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { nodesInService++; } } @@ -99,7 +99,7 @@ public class StorageTypeStats { final DatanodeDescriptor node) { capacityUsed -= info.getDfsUsed(); blockPoolUsed -= info.getBlockPoolUsed(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityTotal -= info.getCapacity(); capacityRemaining -= info.getRemaining(); } else { @@ -108,7 +108,7 @@ public class StorageTypeStats { } void subtractNode(final DatanodeDescriptor node) { - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { nodesInService--; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 563682f..eb870f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5462,11 +5462,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, .<String, Object> builder() .put("xferaddr", node.getXferAddr()) .put("underReplicatedBlocks", - node.decommissioningStatus.getUnderReplicatedBlocks()) + node.getLeavingServiceStatus().getUnderReplicatedBlocks()) + // TODO use another property name for outOfServiceOnlyReplicas. .put("decommissionOnlyReplicas", - node.decommissioningStatus.getDecommissionOnlyReplicas()) + node.getLeavingServiceStatus().getOutOfServiceOnlyReplicas()) .put("underReplicateInOpenFiles", - node.decommissioningStatus.getUnderReplicatedInOpenFiles()) + node.getLeavingServiceStatus().getUnderReplicatedInOpenFiles()) .build(); info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo); } @@ -5528,7 +5529,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockManager.getDatanodeManager().fetchDatanodes(live, null, true); for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) { DatanodeDescriptor node = it.next(); - if (node.isDecommissionInProgress() || node.isDecommissioned()) { + if (!node.isInService()) { it.remove(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 84b51f6..483663e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -548,6 +548,13 @@ </property> <property> + <name>dfs.namenode.maintenance.replication.min</name> + <value>1</value> + <description>Minimal live block replication in existence of maintenance mode. + </description> +</property> + +<property> <name>dfs.namenode.safemode.replication.min</name> <value></value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java index 0698628..534c5e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -102,6 +102,7 @@ public class AdminStatesBaseTest { conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); hostsFileWriter.initialize(conf, "temp/admin"); + } @After @@ -110,17 +111,22 @@ public class AdminStatesBaseTest { shutdownCluster(); } - protected void writeFile(FileSystem fileSys, Path name, int repl) + static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys, + Path name, short repl, short numOfBlocks) throws IOException { + return writeFile(fileSys, name, repl, numOfBlocks, false); + } + + static protected void writeFile(FileSystem fileSys, Path name, int repl) throws IOException { writeFile(fileSys, name, repl, 2); } - protected void writeFile(FileSystem fileSys, Path name, int repl, + static protected void writeFile(FileSystem fileSys, Path name, int repl, int numOfBlocks) throws IOException { writeFile(fileSys, name, repl, numOfBlocks, true); } - protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, + static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, int repl, int numOfBlocks, boolean completeFile) throws IOException { // create and write a file that contains two blocks of data @@ -136,6 +142,7 @@ public class AdminStatesBaseTest { stm.close(); return null; } else { + stm.flush(); // Do not close stream, return it // so that it is not garbage collected return stm; @@ -353,7 +360,7 @@ public class AdminStatesBaseTest { protected void shutdownCluster() { if (cluster != null) { - cluster.shutdown(); + cluster.shutdown(true); } } @@ -362,12 +369,13 @@ public class AdminStatesBaseTest { refreshNodes(conf); } - protected DatanodeDescriptor getDatanodeDesriptor( + static private DatanodeDescriptor getDatanodeDesriptor( final FSNamesystem ns, final String datanodeUuid) { return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); } - protected void cleanupFile(FileSystem fileSys, Path name) throws IOException { + static public void cleanupFile(FileSystem fileSys, Path name) + throws IOException { assertTrue(fileSys.exists(name)); fileSys.delete(name, true); assertTrue(!fileSys.exists(name)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index ddb8237..6ca1e79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -484,7 +484,7 @@ public class TestDecommission extends AdminStatesBaseTest { shutdownCluster(); } } - + /** * Tests cluster storage statistics during decommissioning for non * federated cluster --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org