HDFS-7369. Erasure coding: distribute recovery work for striped blocks to DataNode. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/25806c07 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/25806c07 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/25806c07 Branch: refs/heads/HDFS-7285 Commit: 25806c074fc91a89e190fb9e45a90357bf12ca12 Parents: 1b31c21 Author: Zhe Zhang <z...@apache.org> Authored: Wed Mar 18 15:52:36 2015 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Tue Mar 24 11:16:35 2015 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockCollection.java | 5 + .../server/blockmanagement/BlockManager.java | 296 +++++++++++++------ .../blockmanagement/DatanodeDescriptor.java | 72 ++++- .../server/blockmanagement/DatanodeManager.java | 20 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 9 +- .../server/protocol/BlockECRecoveryCommand.java | 63 ++++ .../hdfs/server/protocol/DatanodeProtocol.java | 1 + .../blockmanagement/BlockManagerTestUtil.java | 2 +- .../blockmanagement/TestBlockManager.java | 22 +- .../TestRecoverStripedBlocks.java | 107 +++++++ 10 files changed, 486 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index 1c753de..62a5781 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -86,4 +86,9 @@ public interface BlockCollection { * @return whether the block collection is under construction. */ public boolean isUnderConstruction(); + + /** + * @return whether the block collection is in striping format + */ + public boolean isStriped(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f7a00f0..300b767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -531,9 +532,9 @@ public class BlockManager { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used - chooseSourceDatanode(block, containingNodes, + chooseSourceDatanodes(getStoredBlock(block), containingNodes, containingLiveReplicasNodes, numReplicas, - UnderReplicatedBlocks.LEVEL); + null, 1, UnderReplicatedBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count @@ -1327,15 +1328,15 @@ public class BlockManager { } /** - * Scan blocks in {@link #neededReplications} and assign replication - * work to data-nodes they belong to. + * Scan blocks in {@link #neededReplications} and assign recovery + * (replication or erasure coding) work to data-nodes they belong to. * * The number of process blocks equals either twice the number of live * data-nodes or the number of under-replicated blocks whichever is less. * * @return number of blocks scheduled for replication during this iteration. */ - int computeReplicationWork(int blocksToProcess) { + int computeBlockRecoveryWork(int blocksToProcess) { List<List<BlockInfo>> blocksToReplicate = null; namesystem.writeLock(); try { @@ -1345,30 +1346,32 @@ public class BlockManager { } finally { namesystem.writeUnlock(); } - return computeReplicationWorkForBlocks(blocksToReplicate); + return computeRecoveryWorkForBlocks(blocksToReplicate); } - /** Replicate a set of blocks + /** + * Recover a set of blocks to full strength through replication or + * erasure coding * - * @param blocksToReplicate blocks to be replicated, for each priority + * @param blocksToRecover blocks to be recovered, for each priority * @return the number of blocks scheduled for replication */ @VisibleForTesting - int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) { + int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) { int requiredReplication, numEffectiveReplicas; List<DatanodeDescriptor> containingNodes; - DatanodeDescriptor srcNode; BlockCollection bc = null; int additionalReplRequired; int scheduledWork = 0; - List<ReplicationWork> work = new LinkedList<ReplicationWork>(); + List<BlockRecoveryWork> recovWork = new LinkedList<>(); + // Step 1: categorize at-risk blocks into replication and EC tasks namesystem.writeLock(); try { synchronized (neededReplications) { - for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (BlockInfo block : blocksToReplicate.get(priority)) { + for (int priority = 0; priority < blocksToRecover.size(); priority++) { + for (BlockInfo block : blocksToRecover.get(priority)) { // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append @@ -1382,25 +1385,31 @@ public class BlockManager { requiredReplication = bc.getBlockReplication(); // get a source data-node - containingNodes = new ArrayList<DatanodeDescriptor>(); - List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>(); + containingNodes = new ArrayList<>(); + List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( + List<Short> missingBlockIndices = new LinkedList<>(); + DatanodeDescriptor[] srcNodes; + int numSourceNodes = bc.isStriped() ? + HdfsConstants.NUM_DATA_BLOCKS : 1; + srcNodes = chooseSourceDatanodes( block, containingNodes, liveReplicaNodes, numReplicas, - priority); - if(srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); + missingBlockIndices, numSourceNodes, priority); + if(srcNodes == null || srcNodes.length == 0) { + // block can not be replicated from any node + LOG.debug("Block " + block + " cannot be recovered " + + "from any node"); continue; } - // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); - + if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { @@ -1417,9 +1426,21 @@ public class BlockManager { } else { additionalReplRequired = 1; // Needed on a new rack } - work.add(new ReplicationWork(block, bc, srcNode, - containingNodes, liveReplicaNodes, additionalReplRequired, - priority)); + if (bc.isStriped()) { + ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority); + short[] missingBlockArray = new short[missingBlockIndices.size()]; + for (int i = 0 ; i < missingBlockIndices.size(); i++) { + missingBlockArray[i] = missingBlockIndices.get(i); + } + ecw.setMissingBlockIndices(missingBlockArray); + recovWork.add(ecw); + } else { + recovWork.add(new ReplicationWork(block, bc, srcNodes, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority)); + } } } } @@ -1427,8 +1448,9 @@ public class BlockManager { namesystem.writeUnlock(); } + // Step 2: choose target nodes for each recovery task final Set<Node> excludedNodes = new HashSet<Node>(); - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); @@ -1442,9 +1464,10 @@ public class BlockManager { rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); } + // Step 3: add tasks to the DN namesystem.writeLock(); try { - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ final DatanodeStorageInfo[] targets = rw.targets; if(targets == null || targets.length == 0){ rw.targets = null; @@ -1483,7 +1506,7 @@ public class BlockManager { if ( (numReplicas.liveReplicas() >= requiredReplication) && (!blockHasEnoughRacks(block)) ) { - if (rw.srcNode.getNetworkLocation().equals( + if (rw.srcNodes[0].getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case continue; @@ -1491,7 +1514,17 @@ public class BlockManager { } // Add block to the to be replicated list - rw.srcNode.addBlockToBeReplicated(block, targets); + if (bc.isStriped()) { + assert rw instanceof ErasureCodingWork; + assert rw.targets.length > 0; + rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( + new ExtendedBlock(namesystem.getBlockPoolId(), block), + rw.srcNodes, rw.targets, + ((ErasureCodingWork)rw).getMissingBlockIndicies()); + } + else { + rw.srcNodes[0].addBlockToBeReplicated(block, targets); + } scheduledWork++; DatanodeStorageInfo.incrementBlocksScheduled(targets); @@ -1515,7 +1548,7 @@ public class BlockManager { if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); @@ -1523,7 +1556,7 @@ public class BlockManager { targetList.append(' '); targetList.append(targets[k].getDatanodeDescriptor()); } - blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode, + blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, rw.block, targetList); } } @@ -1609,54 +1642,65 @@ public class BlockManager { } /** - * Parse the data-nodes the block belongs to and choose one, - * which will be the replication source. + * Parse the data-nodes the block belongs to and choose a certain number + * from them to be the recovery sources. * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. - * Otherwise we choose a random node among those that did not reach their - * replication limits. However, if the replication is of the highest priority - * and all nodes have reached their replication limits, we will choose a - * random node despite the replication limit. + * Otherwise we randomly choose nodes among those that did not reach their + * replication limits. However, if the recovery work is of the highest + * priority and all nodes have reached their replication limits, we will + * randomly choose the desired number of nodes despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. * * @param block Block for which a replication source is needed - * @param containingNodes List to be populated with nodes found to contain the - * given block - * @param nodesContainingLiveReplicas List to be populated with nodes found to - * contain live replicas of the given block - * @param numReplicas NumberReplicas instance to be initialized with the - * counts of live, corrupt, excess, and - * decommissioned replicas of the given - * block. + * @param containingNodes List to be populated with nodes found to contain + * the given block + * @param nodesContainingLiveReplicas List to be populated with nodes found + * to contain live replicas of the given + * block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and decommissioned + * replicas of the given block. + * @param missingBlockIndices List to be populated with indices of missing + * blocks in a striped block group or missing + * replicas of a replicated block + * @param numSourceNodes integer specifying the number of source nodes to + * choose * @param priority integer representing replication priority of the given * block - * @return the DatanodeDescriptor of the chosen node from which to replicate - * the given block - */ - @VisibleForTesting - DatanodeDescriptor chooseSourceDatanode(Block block, - List<DatanodeDescriptor> containingNodes, - List<DatanodeStorageInfo> nodesContainingLiveReplicas, - NumberReplicas numReplicas, - int priority) { + * @return the array of DatanodeDescriptor of the chosen nodes from which to + * recover the given block + */ + @VisibleForTesting + DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, + List<DatanodeDescriptor> containingNodes, + List<DatanodeStorageInfo> nodesContainingLiveReplicas, + NumberReplicas numReplicas, + List<Short> missingBlockIndices, int numSourceNodes, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); - DatanodeDescriptor srcNode = null; + LinkedList<DatanodeDescriptor> srcNodes = new LinkedList<>(); int live = 0; int decommissioned = 0; int corrupt = 0; int excess = 0; - + missingBlockIndices.clear(); + Set<Short> healthyIndices = new HashSet<>(); + Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + if (block.isStriped()) { + healthyIndices.add((short) ((BlockInfoStriped) block). + getStorageBlockIndex(storage)); + } final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); - int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; + int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) corrupt += countableReplica; else if (node.isDecommissionInProgress() || node.isDecommissioned()) @@ -1688,21 +1732,32 @@ public class BlockManager { if(node.isDecommissioned()) continue; // we prefer nodes that are in DECOMMISSION_INPROGRESS state - if(node.isDecommissionInProgress() || srcNode == null) { - srcNode = node; + if(node.isDecommissionInProgress() || srcNodes.size() < numSourceNodes) { + srcNodes.add(node); continue; } - if(srcNode.isDecommissionInProgress()) - continue; // switch to a different node randomly // this to prevent from deterministically selecting the same node even // if the node failed to replicate the block on previous iterations - if(DFSUtil.getRandom().nextBoolean()) - srcNode = node; + if(DFSUtil.getRandom().nextBoolean()) { + int pos = DFSUtil.getRandom().nextInt(numSourceNodes); + if(!srcNodes.get(pos).isDecommissionInProgress()) { + srcNodes.set(pos, node); + } + } + } + if (block.isStriped()) { + for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; i++) { + if (!healthyIndices.contains(i)) { + missingBlockIndices.add(i); + } + } } - if(numReplicas != null) + if(numReplicas != null) { numReplicas.initialize(live, decommissioned, corrupt, excess, 0); - return srcNode; + } + return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); } /** @@ -1732,7 +1787,7 @@ public class BlockManager { */ } } - + /** * StatefulBlockInfo is used to build the "toUC" list, which is a list of * updates to the information about under-construction blocks. @@ -3624,7 +3679,7 @@ public class BlockManager { } /** - * Periodically calls computeReplicationWork(). + * Periodically calls computeBlockRecoveryWork(). */ private class ReplicationMonitor implements Runnable { @@ -3682,7 +3737,7 @@ public class BlockManager { final int nodesToProcess = (int) Math.ceil(numlive * this.blocksInvalidateWorkPct); - int workFound = this.computeReplicationWork(blocksToProcess); + int workFound = this.computeBlockRecoveryWork(blocksToProcess); // Update counters namesystem.writeLock(); @@ -3709,49 +3764,118 @@ public class BlockManager { postponedMisreplicatedBlocks.clear(); postponedMisreplicatedBlocksCount.set(0); } - - - private static class ReplicationWork { - private final BlockInfo block; - private final BlockCollection bc; + /** + * This class is used internally by {@link this#computeRecoveryWorkForBlocks} + * to represent a task to recover a block through replication or erasure + * coding. Recovery is done by transferring data from {@link srcNodes} to + * {@link targets} + */ + private static class BlockRecoveryWork { + protected final BlockInfo block; + protected final BlockCollection bc; - private final DatanodeDescriptor srcNode; - private final List<DatanodeDescriptor> containingNodes; - private final List<DatanodeStorageInfo> liveReplicaStorages; - private final int additionalReplRequired; + /** + * An erasure coding recovery task has multiple source nodes. + * A replication task only has 1 source node, stored on top of the array + */ + protected final DatanodeDescriptor[] srcNodes; + /** Nodes containing the block; avoid them in choosing new targets */ + protected final List<DatanodeDescriptor> containingNodes; + /** Required by {@link BlockPlacementPolicy#chooseTarget} */ + protected final List<DatanodeStorageInfo> liveReplicaStorages; + protected final int additionalReplRequired; - private DatanodeStorageInfo targets[]; - private final int priority; + protected DatanodeStorageInfo[] targets; + protected final int priority; - public ReplicationWork(BlockInfo block, + public BlockRecoveryWork(BlockInfo block, BlockCollection bc, - DatanodeDescriptor srcNode, + DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes, List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, int priority) { this.block = block; this.bc = bc; - this.srcNode = srcNode; - this.srcNode.incrementPendingReplicationWithoutTargets(); + this.srcNodes = srcNodes; this.containingNodes = containingNodes; this.liveReplicaStorages = liveReplicaStorages; this.additionalReplRequired = additionalReplRequired; this.priority = priority; this.targets = null; } - - private void chooseTargets(BlockPlacementPolicy blockplacement, + + protected void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set<Node> excludedNodes) { + } + } + + private static class ReplicationWork extends BlockRecoveryWork { + + public ReplicationWork(BlockInfo block, + BlockCollection bc, + DatanodeDescriptor[] srcNodes, + List<DatanodeDescriptor> containingNodes, + List<DatanodeStorageInfo> liveReplicaStorages, + int additionalReplRequired, + int priority) { + super(block, bc, srcNodes, containingNodes, + liveReplicaStorages, additionalReplRequired, priority); + LOG.debug("Creating a ReplicationWork to recover " + block); + } + + protected void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set<Node> excludedNodes) { + assert srcNodes.length > 0 + : "At least 1 source node should have been selected"; + try { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNodes[0], liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + } finally { + srcNodes[0].decrementPendingReplicationWithoutTargets(); + } + } + } + + private static class ErasureCodingWork extends BlockRecoveryWork { + + private short[] missingBlockIndicies = null; + + public ErasureCodingWork(BlockInfo block, + BlockCollection bc, + DatanodeDescriptor[] srcNodes, + List<DatanodeDescriptor> containingNodes, + List<DatanodeStorageInfo> liveReplicaStorages, + int additionalReplRequired, + int priority) { + super(block, bc, srcNodes, containingNodes, + liveReplicaStorages, additionalReplRequired, priority); + LOG.debug("Creating an ErasureCodingWork to recover " + block); + } + + public short[] getMissingBlockIndicies() { + return missingBlockIndicies; + } + + public void setMissingBlockIndices(short[] missingBlockIndicies) { + this.missingBlockIndicies = missingBlockIndicies; + } + + protected void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set<Node> excludedNodes) { try { + // TODO: new placement policy for EC considering multiple writers targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, + additionalReplRequired, srcNodes[0], liveReplicaStorages, false, excludedNodes, block.getNumBytes(), storagePolicySuite.getPolicy(bc.getStoragePolicyID())); } finally { - srcNode.decrementPendingReplicationWithoutTargets(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 334a416..9f2a4de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.Arrays; import com.google.common.annotations.VisibleForTesting; @@ -41,6 +42,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -97,6 +99,33 @@ public class DatanodeDescriptor extends DatanodeInfo { } } + /** Block and targets pair */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + public static class BlockECRecoveryInfo { + public final ExtendedBlock block; + public final DatanodeDescriptor[] sources; + public final DatanodeStorageInfo[] targets; + public final short[] missingBlockIndices; + + BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources, + DatanodeStorageInfo[] targets, short[] missingBlockIndices) { + this.block = block; + this.sources = sources; + this.targets = targets; + this.missingBlockIndices = missingBlockIndices; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockECRecoveryInfo(\n "). + append("Recovering ").append(block). + append(" From: ").append(Arrays.asList(sources)). + append(" To: ").append(Arrays.asList(targets)).append(")\n"). + toString(); + } + } + /** A BlockTargetPair queue. */ private static class BlockQueue<E> { private final Queue<E> blockq = new LinkedList<E>(); @@ -217,12 +246,17 @@ public class DatanodeDescriptor extends DatanodeInfo { private long bandwidth; /** A queue of blocks to be replicated by this datanode */ - private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>(); + private final BlockQueue<BlockTargetPair> replicateBlocks = + new BlockQueue<>(); + /** A queue of blocks to be erasure coded by this datanode */ + private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks = + new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks = - new BlockQueue<BlockInfoContiguousUnderConstruction>(); + private final BlockQueue<BlockInfoContiguousUnderConstruction> + recoverBlocks = new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ - private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>(); + private final LightWeightHashSet<Block> invalidateBlocks = + new LightWeightHashSet<>(); /* Variables for maintaining number of blocks scheduled to be written to * this storage. This count is approximate and might be slightly bigger @@ -375,6 +409,7 @@ public class DatanodeDescriptor extends DatanodeInfo { this.invalidateBlocks.clear(); this.recoverBlocks.clear(); this.replicateBlocks.clear(); + this.erasurecodeBlocks.clear(); } // pendingCached, cached, and pendingUncached are protected by the // FSN lock. @@ -592,6 +627,20 @@ public class DatanodeDescriptor extends DatanodeInfo { } /** + * Store block erasure coding work. + */ + void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources, + DatanodeStorageInfo[] targets, short[] missingBlockIndicies) { + assert(block != null && sources != null && sources.length > 0); + BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, + missingBlockIndicies); + erasurecodeBlocks.offer(task); + BlockManager.LOG.debug("Adding block recovery task " + task + + "to " + getName() + ", current queue size is " + + erasurecodeBlocks.size()); + } + + /** * Store block recovery work. */ void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) { @@ -623,6 +672,13 @@ public class DatanodeDescriptor extends DatanodeInfo { } /** + * The number of work items that are pending to be replicated + */ + int getNumberOfBlocksToBeErasureCoded() { + return erasurecodeBlocks.size(); + } + + /** * The number of block invalidation items that are pending to * be sent to the datanode */ @@ -636,6 +692,10 @@ public class DatanodeDescriptor extends DatanodeInfo { return replicateBlocks.poll(maxTransfers); } + public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) { + return erasurecodeBlocks.poll(maxTransfers); + } + public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) @@ -836,6 +896,10 @@ public class DatanodeDescriptor extends DatanodeInfo { if (repl > 0) { sb.append(" ").append(repl).append(" blocks to be replicated;"); } + int ec = erasurecodeBlocks.size(); + if(ec > 0) { + sb.append(" ").append(ec).append(" blocks to be erasure coded;"); + } int inval = invalidateBlocks.size(); if (inval > 0) { sb.append(" ").append(inval).append(" blocks to be invalidated;"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index f68c4fd..6228f86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -1344,7 +1345,7 @@ public class DatanodeManager { VolumeFailureSummary volumeFailureSummary) throws IOException { synchronized (heartbeatManager) { synchronized (datanodeMap) { - DatanodeDescriptor nodeinfo = null; + DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); } catch(UnregisteredNodeException e) { @@ -1382,10 +1383,10 @@ public class DatanodeManager { final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List<DatanodeStorageInfo> recoveryLocations = - new ArrayList<DatanodeStorageInfo>(storages.length); - for (int i = 0; i < storages.length; i++) { - if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { - recoveryLocations.add(storages[i]); + new ArrayList<>(storages.length); + for (DatanodeStorageInfo storage : storages) { + if (!storage.getDatanodeDescriptor().isStale(staleInterval)) { + recoveryLocations.add(storage); } } // If we are performing a truncate recovery than set recovery fields @@ -1424,7 +1425,7 @@ public class DatanodeManager { return new DatanodeCommand[] { brCommand }; } - final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(); + final List<DatanodeCommand> cmds = new ArrayList<>(); //check pending replication List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( maxTransfers); @@ -1432,6 +1433,13 @@ public class DatanodeManager { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } + // checking pending erasure coding tasks + List<BlockECRecoveryInfo> pendingECList = + nodeinfo.getErasureCodeCommand(maxTransfers); + if (pendingECList != null) { + cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC, + pendingECList)); + } //check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 3474c09..f9a81e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -407,6 +407,7 @@ public class INodeFile extends INodeWithAdditionalFields } @Override // BlockCollection + // TODO: rename to reflect both replication and EC public short getBlockReplication() { short max = getFileReplication(CURRENT_STATE_ID); FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature(); @@ -417,7 +418,8 @@ public class INodeFile extends INodeWithAdditionalFields } max = maxInSnapshot > max ? maxInSnapshot : max; } - return max; + return isStriped()? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max; } /** Set the replication factor of this file. */ @@ -1077,11 +1079,12 @@ public class INodeFile extends INodeWithAdditionalFields Arrays.asList(snapshotBlocks).contains(block); } - @VisibleForTesting /** * @return true if the file is in the striping layout. */ - // TODO: move erasure coding policy to file XAttr (HDFS-7337) + @VisibleForTesting + @Override + // TODO: move erasure coding policy to file XAttr public boolean isStriped() { return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java new file mode 100644 index 0000000..f7f02fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.base.Joiner; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; + +import java.util.Collection; + +/** + * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a + * striped block group with missing blocks. + * + * Upon receiving this command, the DataNode pulls data from other DataNodes + * hosting blocks in this group and reconstructs the lost blocks through codec + * calculation. + * + * After the reconstruction, the DataNode pushes the reconstructed blocks to + * their final destinations if necessary (e.g., the destination is different + * from the reconstruction node, or multiple blocks in a group are to be + * reconstructed). + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockECRecoveryCommand extends DatanodeCommand { + final Collection<BlockECRecoveryInfo> ecTasks; + + /** + * Create BlockECRecoveryCommand from a collection of + * {@link BlockECRecoveryInfo}, each representing a recovery task + */ + public BlockECRecoveryCommand(int action, + Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) { + super(action); + this.ecTasks = blockECRecoveryInfoList; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("BlockECRecoveryCommand(\n "); + Joiner.on("\n ").appendTo(sb, ecTasks); + sb.append("\n)"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index a3b6004..b8ac165 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,6 +76,7 @@ public interface DatanodeProtocol { final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks + final static int DNA_CODEC = 11; // uncache blocks /** * Register Datanode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 148135b..e25ee31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -161,7 +161,7 @@ public class BlockManagerTestUtil { */ public static int computeAllPendingWork(BlockManager bm) { int work = computeInvalidationWork(bm); - work += bm.computeReplicationWork(Integer.MAX_VALUE); + work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE); return work; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index ca7055c..89c269e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -448,8 +448,8 @@ public class TestBlockManager { assertEquals("Block not initially pending replication", 0, bm.pendingReplications.getNumReplicas(block)); assertEquals( - "computeReplicationWork should indicate replication is needed", 1, - bm.computeReplicationWorkForBlocks(list_all)); + "computeBlockRecoveryWork should indicate replication is needed", 1, + bm.computeRecoveryWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); @@ -503,22 +503,22 @@ public class TestBlockManager { assertNotNull("Chooses source node for a highest-priority replication" + " even if all available source nodes have reached their replication" + " limits below the hard limit.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + null, 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); assertNull("Does not choose a source node for a less-than-highest-priority" + " replication since all available source nodes have reached" + " their replication limits.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)); + null, 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; @@ -526,12 +526,12 @@ public class TestBlockManager { assertNull("Does not choose a source node for a highest-priority" + " replication when all available nodes exceed the hard limit.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + null, 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/25806c07/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java new file mode 100644 index 0000000..d883c9b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Iterator; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME; +import static org.junit.Assert.assertTrue; + +public class TestRecoverStripedBlocks { + private final short GROUP_SIZE = + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; + private final short NUM_OF_DATANODES = GROUP_SIZE + 1; + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private static final int BLOCK_SIZE = 1024; + private HdfsAdmin dfsAdmin; + private FSNamesystem namesystem; + private Path ECFilePath; + + @Before + public void setupCluster() throws IOException { + conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + // Large value to make sure the pending replication request can stay in + // DatanodeDescriptor.replicateBlocks before test timeout. + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via + // chooseUnderReplicatedBlocks at once. + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(NUM_OF_DATANODES).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + dfsAdmin = new HdfsAdmin(cluster.getURI(), conf); + namesystem = cluster.getNamesystem(); + ECFilePath = new Path("/ecfile"); + DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0); + dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME); + } + + @Test + public void testMissingStripedBlock() throws Exception { + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath); + Iterator<DatanodeStorageInfo> storageInfos = + bm.blocksMap.getStorages(b.getLocalBlock()) + .iterator(); + + DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); + Iterator<BlockInfo> it = firstDn.getBlockIterator(); + int missingBlkCnt = 0; + while (it.hasNext()) { + BlockInfo blk = it.next(); + BlockManager.LOG.debug("Block " + blk + " will be lost"); + missingBlkCnt++; + } + BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks"); + + bm.getDatanodeManager().removeDatanode(firstDn); + + bm.computeDatanodeWork(); + + short cnt = 0; + for (DataNode dn : cluster.getDataNodes()) { + DatanodeDescriptor dnDescriptor = + bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid()); + cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded(); + } + + assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt); + } +}