HDFS-9173. Erasure Coding: Lease recovery for striped file. Contributed by Walter Su and Jing Zhao.
Change-Id: I51703a61c9d8454f883028f3f6acb5729fde1b15 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61ab0440 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61ab0440 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61ab0440 Branch: refs/heads/yarn-2877 Commit: 61ab0440f7eaff0f631cbae0378403912f88d7ad Parents: 85c2466 Author: Zhe Zhang <z...@apache.org> Authored: Fri Dec 18 15:57:48 2015 -0800 Committer: Zhe Zhang <z...@apache.org> Committed: Fri Dec 18 15:57:48 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hdfs/DFSStripedOutputStream.java | 4 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 37 ++- .../server/blockmanagement/DatanodeManager.java | 12 +- .../server/datanode/BlockRecoveryWorker.java | 238 ++++++++++++++++- .../hdfs/server/namenode/FSNamesystem.java | 58 ++--- .../server/protocol/BlockRecoveryCommand.java | 33 +++ .../hadoop-hdfs/src/main/proto/HdfsServer.proto | 6 +- .../apache/hadoop/hdfs/StripedFileTestUtil.java | 10 +- .../hadoop/hdfs/TestLeaseRecoveryStriped.java | 257 +++++++++++++++++++ .../hdfs/server/datanode/TestBlockRecovery.java | 34 ++- .../TestCommitBlockSynchronization.java | 5 +- .../namenode/TestRecoverStripedBlocks.java | 5 +- 13 files changed, 634 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 546fa92..e1ff844 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; @@ -970,7 +971,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } - private void enqueueAllCurrentPackets() throws IOException { + @VisibleForTesting + void enqueueAllCurrentPackets() throws IOException { int idx = streamers.indexOf(getCurrentStreamer()); for(int i = 0; i < streamers.size(); i++) { final StripedDataStreamer si = setCurrentStreamer(i); http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 76d5c24..2286203 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -873,6 +873,9 @@ Trunk (Unreleased) HDFS-9373. Erasure coding: friendly log information for write operations with some failed streamers. (Li Bo via zhz) + HDFS-9173. Erasure Coding: Lease recovery for striped file. (Walter Su and + Jing Zhao via zhz) + HDFS-9451. Clean up depreated umasks and related unit tests. (Wei-Chiu Chuang via wheat9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 9271e33..d7a793a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; @@ -359,6 +360,12 @@ public class PBHelper { builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp()); if(b.getNewBlock() != null) builder.setTruncateBlock(PBHelperClient.convert(b.getNewBlock())); + if (b instanceof RecoveringStripedBlock) { + RecoveringStripedBlock sb = (RecoveringStripedBlock) b; + builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( + sb.getErasureCodingPolicy())); + builder.addAllBlockIndices(asList(sb.getBlockIndices())); + } return builder.build(); } @@ -372,6 +379,16 @@ public class PBHelper { rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(), b.getNewGenStamp()); } + + if (b.hasEcPolicy()) { + List<Integer> BlockIndicesList = b.getBlockIndicesList(); + int[] indices = new int[BlockIndicesList.size()]; + for (int i = 0; i < BlockIndicesList.size(); i++) { + indices[i] = BlockIndicesList.get(i).shortValue(); + } + rBlock = new RecoveringStripedBlock(rBlock, indices, + PBHelperClient.convertErasureCodingPolicy(b.getEcPolicy())); + } return rBlock; } @@ -823,12 +840,20 @@ public class PBHelper { build(); } - private static List<Integer> convertIntArray(short[] liveBlockIndices) { - List<Integer> liveBlockIndicesList = new ArrayList<>(); - for (short s : liveBlockIndices) { - liveBlockIndicesList.add((int) s); + private static List<Integer> asList(int[] arr) { + List<Integer> list = new ArrayList<>(arr.length); + for (int s : arr) { + list.add(s); + } + return list; + } + + private static List<Integer> asList(short[] arr) { + List<Integer> list = new ArrayList<>(arr.length); + for (int s : arr) { + list.add(s); } - return liveBlockIndicesList; + return list; } private static StorageTypesProto convertStorageTypesProto( @@ -925,7 +950,7 @@ public class PBHelper { builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); - builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + builder.addAllLiveBlockIndices(asList(liveBlockIndices)); builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy( blockEcRecoveryInfo.getErasureCodingPolicy())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index cdd5b9e..0c0c01a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; @@ -504,6 +505,7 @@ public class DatanodeManager { public DatanodeStorageInfo[] getDatanodeStorageInfos( DatanodeID[] datanodeID, String[] storageIDs, String format, Object... args) throws UnregisteredNodeException { + storageIDs = storageIDs == null ? new String[0] : storageIDs; if (datanodeID.length != storageIDs.length) { final String err = (storageIDs.length == 0? "Missing storageIDs: It is likely that the HDFS client," @@ -524,9 +526,11 @@ public class DatanodeManager { continue; } final DatanodeDescriptor dd = getDatanode(datanodeID[i]); - storages[i] = dd.getStorageInfo(storageIDs[i]); + if (dd != null) { + storages[i] = dd.getStorageInfo(storageIDs[i]); + } } - return storages; + return storages; } /** Prints information about all datanodes. */ @@ -1366,6 +1370,10 @@ public class DatanodeManager { } else { rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, uc.getBlockRecoveryId()); + if (b.isStriped()) { + rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(), + ((BlockInfoStriped) b).getErasureCodingPolicy()); + } } brCommand.add(rBlock); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java index 4ad1168..db0c6ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java @@ -17,16 +17,21 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.ipc.RemoteException; @@ -37,7 +42,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; /** * This class handles the block recovery work commands. @@ -78,6 +88,10 @@ public class BlockRecoveryWorker { newLength); } + public ReplicaRecoveryInfo getReplicaRecoveryInfo(){ + return rInfo; + } + @Override public String toString() { return "block:" + rInfo + " node:" + id; @@ -294,12 +308,8 @@ public class BlockRecoveryWorker { // we never know the actual state of the replica on failed data-nodes. // The recovery should be started over. if (!failedList.isEmpty()) { - StringBuilder b = new StringBuilder(); - for(DatanodeID id : failedList) { - b.append("\n " + id); - } - throw new IOException("Cannot recover " + block + ", the following " - + failedList.size() + " data-nodes failed {" + b + "\n}"); + throw new IOException("Cannot recover " + block + + ", the following datanodes failed: " + failedList); } // Notify the name-node about successfully recovered replicas. @@ -323,6 +333,215 @@ public class BlockRecoveryWorker { } } + /** + * blk_0 blk_1 blk_2 blk_3 blk_4 blk_5 blk_6 blk_7 blk_8 + * 64k 64k 64k 64k 64k 64k 64k 64k 64k <-- stripe_0 + * 64k 64k 64k 64k 64k 64k 64k 64k 64k + * 64k 64k 64k 64k 64k 64k 64k 61k <-- startStripeIdx + * 64k 64k 64k 64k 64k 64k 64k + * 64k 64k 64k 64k 64k 64k 59k + * 64k 64k 64k 64k 64k 64k + * 64k 64k 64k 64k 64k 64k <-- last full stripe + * 64k 64k 13k 64k 55k 3k <-- target last stripe + * 64k 64k 64k 1k + * 64k 64k 58k + * 64k 64k + * 64k 19k + * 64k <-- total visible stripe + * + * Due to different speed of streamers, the internal blocks in a block group + * could have different lengths when the block group isn't ended normally. + * The purpose of this class is to recover the UnderConstruction block group, + * so all internal blocks end at the same stripe. + * + * The steps: + * 1. get all blocks lengths from DataNodes. + * 2. calculate safe length, which is at the target last stripe. + * 3. decode and feed blk_6~8, make them end at last full stripe. (the last + * full stripe means the last decodable stripe.) + * 4. encode the target last stripe, with the remaining sequential data. In + * this case, the sequential data is 64k+64k+13k. Feed blk_6~8 the parity cells. + * Overwrite the parity cell if have to. + * 5. truncate the stripes from visible stripe, to target last stripe. + * TODO: implement step 3,4 + */ + public class RecoveryTaskStriped { + private final RecoveringBlock rBlock; + private final ExtendedBlock block; + private final String bpid; + private final DatanodeInfo[] locs; + private final long recoveryId; + + private final int[] blockIndices; + private final ErasureCodingPolicy ecPolicy; + + RecoveryTaskStriped(RecoveringStripedBlock rBlock) { + this.rBlock = rBlock; + // TODO: support truncate + Preconditions.checkArgument(rBlock.getNewBlock() == null); + + block = rBlock.getBlock(); + bpid = block.getBlockPoolId(); + locs = rBlock.getLocations(); + recoveryId = rBlock.getNewGenerationStamp(); + blockIndices = rBlock.getBlockIndices(); + ecPolicy = rBlock.getErasureCodingPolicy(); + } + + protected void recover() throws IOException { + checkLocations(locs.length); + + Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length); + final int dataBlkNum = ecPolicy.getNumDataUnits(); + final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits(); + //check generation stamps + for (int i = 0; i < locs.length; i++) { + DatanodeID id = locs[i]; + try { + DatanodeID bpReg = new DatanodeID( + datanode.getBPOfferService(bpid).bpRegistration); + InterDatanodeProtocol proxyDN = bpReg.equals(id) ? + datanode : DataNode.createInterDataNodeProtocolProxy(id, conf, + dnConf.socketTimeout, dnConf.connectToDnViaHostname); + ExtendedBlock internalBlk = new ExtendedBlock(block); + final long blockId = block.getBlockId() + blockIndices[i]; + internalBlk.setBlockId(blockId); + ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN, + new RecoveringBlock(internalBlk, null, recoveryId)); + + if (info != null && + info.getGenerationStamp() >= block.getGenerationStamp() && + info.getNumBytes() > 0) { + final BlockRecord existing = syncBlocks.get(blockId); + if (existing == null || + info.getNumBytes() > existing.rInfo.getNumBytes()) { + // if we have >1 replicas for the same internal block, we + // simply choose the one with larger length. + // TODO: better usage of redundant replicas + syncBlocks.put(blockId, new BlockRecord(id, proxyDN, info)); + } + } + } catch (RecoveryInProgressException ripE) { + InterDatanodeProtocol.LOG.warn( + "Recovery for replica " + block + " on data-node " + id + + " is already in progress. Recovery id = " + + rBlock.getNewGenerationStamp() + " is aborted.", ripE); + return; + } catch (IOException e) { + InterDatanodeProtocol.LOG.warn( + "Failed to obtain replica info for block (=" + block + + ") from datanode (=" + id + ")", e); + } + } + checkLocations(syncBlocks.size()); + + final long safeLength = getSafeLength(syncBlocks); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering block " + block + + ", length=" + block.getNumBytes() + ", safeLength=" + safeLength + + ", syncList=" + syncBlocks); + } + + // If some internal blocks reach the safe length, convert them to RUR + List<BlockRecord> rurList = new ArrayList<>(locs.length); + for (BlockRecord r : syncBlocks.values()) { + int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK); + long newSize = getInternalBlockLength(safeLength, ecPolicy.getCellSize(), + dataBlkNum, blockIndex); + if (r.rInfo.getNumBytes() >= newSize) { + rurList.add(r); + } + } + assert rurList.size() >= dataBlkNum : "incorrect safe length"; + + // Recovery the striped block by truncating internal blocks to the safe + // length. Abort if there is any failure in this step. + truncatePartialBlock(rurList, safeLength); + + // notify Namenode the new size and locations + final DatanodeID[] newLocs = new DatanodeID[totalBlkNum]; + final String[] newStorages = new String[totalBlkNum]; + for (int i = 0; i < totalBlkNum; i++) { + newLocs[blockIndices[i]] = DatanodeID.EMPTY_DATANODE_ID; + newStorages[blockIndices[i]] = ""; + } + for (BlockRecord r : rurList) { + int index = (int) (r.rInfo.getBlockId() & + HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); + newLocs[index] = r.id; + newStorages[index] = r.storageID; + } + ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(), + safeLength, recoveryId); + DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid); + nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(), + newBlock.getNumBytes(), true, false, newLocs, newStorages); + } + + private void truncatePartialBlock(List<BlockRecord> rurList, + long safeLength) throws IOException { + int cellSize = ecPolicy.getCellSize(); + int dataBlkNum = ecPolicy.getNumDataUnits(); + List<DatanodeID> failedList = new ArrayList<>(); + for (BlockRecord r : rurList) { + int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK); + long newSize = getInternalBlockLength(safeLength, cellSize, dataBlkNum, + blockIndex); + try { + r.updateReplicaUnderRecovery(bpid, recoveryId, r.rInfo.getBlockId(), + newSize); + } catch (IOException e) { + InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + + ", datanode=" + r.id + ")", e); + failedList.add(r.id); + } + } + + // If any of the data-nodes failed, the recovery fails, because + // we never know the actual state of the replica on failed data-nodes. + // The recovery should be started over. + if (!failedList.isEmpty()) { + throw new IOException("Cannot recover " + block + + ", the following datanodes failed: " + failedList); + } + } + + /** + * TODO: the current implementation depends on the assumption that the + * parity cells are only generated based on the full stripe. This is not + * true after we support hflush. + */ + @VisibleForTesting + long getSafeLength(Map<Long, BlockRecord> syncBlocks) { + final int cellSize = ecPolicy.getCellSize(); + final int dataBlkNum = ecPolicy.getNumDataUnits(); + Preconditions.checkArgument(syncBlocks.size() >= dataBlkNum); + final int stripeSize = dataBlkNum * cellSize; + long[] blockLengths = new long[syncBlocks.size()]; + int i = 0; + for (BlockRecord r : syncBlocks.values()) { + ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo(); + blockLengths[i++] = rInfo.getNumBytes(); + } + Arrays.sort(blockLengths); + // full stripe is a stripe has at least dataBlkNum full cells. + // lastFullStripeIdx is the index of the last full stripe. + int lastFullStripeIdx = + (int) (blockLengths[blockLengths.length - dataBlkNum] / cellSize); + return lastFullStripeIdx * stripeSize; // return the safeLength + // TODO: Include lastFullStripeIdx+1 stripe in safeLength, if there exists + // such a stripe (and it must be partial). + } + + private void checkLocations(int locationCount) + throws IOException { + if (locationCount < ecPolicy.getNumDataUnits()) { + throw new IOException(block + " has no enough internal blocks" + + ", unable to start recovery. Locations=" + Arrays.asList(locs)); + } + } + } + private static void logRecoverBlock(String who, RecoveringBlock rb) { ExtendedBlock block = rb.getBlock(); DatanodeInfo[] targets = rb.getLocations(); @@ -379,8 +598,11 @@ public class BlockRecoveryWorker { for(RecoveringBlock b : blocks) { try { logRecoverBlock(who, b); - RecoveryTaskContiguous task = new RecoveryTaskContiguous(b); - task.recover(); + if (b.isStriped()) { + new RecoveryTaskStriped((RecoveringStripedBlock) b).recover(); + } else { + new RecoveryTaskContiguous(b).recover(); + } } catch (IOException e) { LOG.warn("recoverBlocks FAILED: " + b, e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b25c5f7..97cb6fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -197,7 +197,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; @@ -3285,57 +3284,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, storedBlock.setNumBytes(newlength); } - // find the DatanodeDescriptor objects - ArrayList<DatanodeDescriptor> trimmedTargets = - new ArrayList<DatanodeDescriptor>(newtargets.length); - ArrayList<String> trimmedStorages = - new ArrayList<String>(newtargets.length); - if (newtargets.length > 0) { - for (int i = 0; i < newtargets.length; ++i) { - // try to get targetNode - DatanodeDescriptor targetNode = - blockManager.getDatanodeManager().getDatanode(newtargets[i]); - if (targetNode != null) { - trimmedTargets.add(targetNode); - trimmedStorages.add(newtargetstorages[i]); - } else if (LOG.isDebugEnabled()) { - LOG.debug("DatanodeDescriptor (=" + newtargets[i] + ") not found"); - } - } - } - if ((closeFile) && !trimmedTargets.isEmpty()) { + // Find the target DatanodeStorageInfos. If not found because of invalid + // or empty DatanodeID/StorageID, the slot of same offset in dsInfos is + // null + final DatanodeStorageInfo[] dsInfos = blockManager.getDatanodeManager(). + getDatanodeStorageInfos(newtargets, newtargetstorages, + "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d", + src, oldBlock, newgenerationstamp, newlength); + + if (closeFile && dsInfos != null) { // the file is getting closed. Insert block locations into blockManager. // Otherwise fsck will report these blocks as MISSING, especially if the // blocksReceived from Datanodes take a long time to arrive. - for (int i = 0; i < trimmedTargets.size(); i++) { - DatanodeStorageInfo storageInfo = - trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); - if (storageInfo != null) { + for (int i = 0; i < dsInfos.length; i++) { + if (dsInfos[i] != null) { if(copyTruncate) { - storageInfo.addBlock(truncatedBlock, truncatedBlock); + dsInfos[i].addBlock(truncatedBlock, truncatedBlock); } else { - storageInfo.addBlock(storedBlock, storedBlock); + Block bi = new Block(storedBlock); + if (storedBlock.isStriped()) { + bi.setBlockId(bi.getBlockId() + i); + } + dsInfos[i].addBlock(storedBlock, bi); } } } } // add pipeline locations into the INodeUnderConstruction - DatanodeStorageInfo[] trimmedStorageInfos = - blockManager.getDatanodeManager().getDatanodeStorageInfos( - trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]), - trimmedStorages.toArray(new String[trimmedStorages.size()]), - "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d", - src, oldBlock, newgenerationstamp, newlength); - if(copyTruncate) { - iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos); + iFile.convertLastBlockToUC(truncatedBlock, dsInfos); } else { - iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos); + iFile.convertLastBlockToUC(storedBlock, dsInfos); if (closeFile) { blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(), storedBlock, oldGenerationStamp, oldNumBytes, - trimmedStorageInfos); + dsInfos); } } } @@ -3343,7 +3327,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (closeFile) { if(copyTruncate) { closeFileCommitBlocks(src, iFile, truncatedBlock); - if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) { + if(!iFile.isBlockInLatestSnapshot(storedBlock)) { blockManager.removeBlock(storedBlock); } } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java index 3adc85c..8dc9d39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -76,6 +77,13 @@ public class BlockRecoveryCommand extends DatanodeCommand { this.recoveryBlock = recoveryBlock; } + public RecoveringBlock(RecoveringBlock rBlock) { + super(rBlock.getBlock(), rBlock.getLocations(), rBlock.getStorageIDs(), + rBlock.getStorageTypes()); + this.newGenerationStamp = rBlock.newGenerationStamp; + this.recoveryBlock = rBlock.recoveryBlock; + } + /** * Return the new generation stamp of the block, * which also plays role of the recovery id. @@ -92,6 +100,31 @@ public class BlockRecoveryCommand extends DatanodeCommand { } } + public static class RecoveringStripedBlock extends RecoveringBlock { + private final int[] blockIndices; + private final ErasureCodingPolicy ecPolicy; + + public RecoveringStripedBlock(RecoveringBlock rBlock, int[] blockIndices, + ErasureCodingPolicy ecPolicy) { + super(rBlock); + this.blockIndices = blockIndices; + this.ecPolicy = ecPolicy; + } + + public int[] getBlockIndices() { + return blockIndices; + } + + public ErasureCodingPolicy getErasureCodingPolicy() { + return ecPolicy; + } + + @Override + public boolean isStriped() { + return true; + } + } + /** * Create empty BlockRecoveryCommand. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto index 66b2a33..453ba29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto @@ -120,6 +120,10 @@ message RecoveringBlockProto { required uint64 newGenStamp = 1; // New genstamp post recovery required LocatedBlockProto block = 2; // Block to be recovered optional BlockProto truncateBlock = 3; // New block for recovery (truncate) + + optional ErasureCodingPolicyProto ecPolicy = 4; + // block indices of striped internal blocks for each storage in LocatedBlock + repeated uint32 blockIndices = 5; } /** @@ -195,4 +199,4 @@ message NamenodeRegistrationProto { } required StorageInfoProto storageInfo = 3; // Node information optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 9942a2d..2f54078 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -59,13 +59,11 @@ public class StripedFileTestUtil { public static final short NUM_DATA_BLOCKS = (short) 6; public static final short NUM_PARITY_BLOCKS = (short) 3; public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; - public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS; - - public static final int stripesPerBlock = 4; - public static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; - public static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; - public static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; + static int stripesPerBlock = 4; + public static int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; + static int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; + static int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; static byte[] generateBytes(int cnt) { byte[] bytes = new byte[cnt]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java new file mode 100644 index 0000000..38ee67a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java @@ -0,0 +1,257 @@ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +public class TestLeaseRecoveryStriped { + public static final Log LOG = LogFactory.getLog(TestLeaseRecoveryStriped.class); + + private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS; + private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS; + private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; + private static final int STRIPE_SIZE = NUM_DATA_BLOCKS * CELL_SIZE; + private static final int STRIPES_PER_BLOCK = 15; + private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; + private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS; + private static final int bytesPerChecksum = 512; + + static { + GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); + StripedFileTestUtil.stripesPerBlock = STRIPES_PER_BLOCK; + StripedFileTestUtil.blockSize = BLOCK_SIZE; + StripedFileTestUtil.BLOCK_GROUP_SIZE = BLOCK_GROUP_SIZE; + } + + static private final String fakeUsername = "fakeUser1"; + static private final String fakeGroup = "supergroup"; + + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + private Configuration conf; + private final Path dir = new Path("/" + this.getClass().getSimpleName()); + final Path p = new Path(dir, "testfile"); + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfs.mkdirs(dir); + dfs.setErasureCodingPolicy(dir, null); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + public static final int[][][] BLOCK_LENGTHS_SUITE = { + {{ 11 * CELL_SIZE,10 * CELL_SIZE, 9 * CELL_SIZE, + 8 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE, + 5 * CELL_SIZE, 4 * CELL_SIZE, 3 * CELL_SIZE}, + {36 * CELL_SIZE}}, + + {{ 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE, + 6 * CELL_SIZE, 7 * CELL_SIZE, 8 * CELL_SIZE, + 9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE}, + {36 * CELL_SIZE}}, + + {{ 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE, + 5 * CELL_SIZE, 4 * CELL_SIZE, 2 * CELL_SIZE, + 9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE}, + {36 * CELL_SIZE}}, + + {{ 8 * CELL_SIZE + bytesPerChecksum, + 7 * CELL_SIZE + bytesPerChecksum * 2, + 6 * CELL_SIZE + bytesPerChecksum * 2, + 5 * CELL_SIZE - bytesPerChecksum * 3, + 4 * CELL_SIZE - bytesPerChecksum * 4, + 3 * CELL_SIZE - bytesPerChecksum * 4, + 9 * CELL_SIZE, 10 * CELL_SIZE, 11 * CELL_SIZE}, + {36 * CELL_SIZE}}, + }; + + @Test + public void testLeaseRecovery() throws Exception { + for(int i=0; i < BLOCK_LENGTHS_SUITE.length; i++){ + int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0]; + int safeLength = BLOCK_LENGTHS_SUITE[i][1][0]; + try { + runTest(blockLengths, safeLength); + } catch (Throwable e){ + String msg = "failed testCase at i=" + i + ", blockLengths=" + + Arrays.toString(blockLengths) + "\n" + + StringUtils.stringifyException(e); + Assert.fail(msg); + } + } + } + + private void runTest(int[] blockLengths, int safeLength) throws Exception { + writePartialBlocks(blockLengths); + recoverLease(); + + List<Long> oldGS = new ArrayList<>(); + oldGS.add(1001L); + StripedFileTestUtil.checkData(dfs, p, safeLength, + new ArrayList<DatanodeInfo>(), oldGS); + // After recovery, storages are reported by primary DN. we should verify + // storages reported by blockReport. + cluster.restartNameNode(true); + StripedFileTestUtil.checkData(dfs, p, safeLength, + new ArrayList<DatanodeInfo>(), oldGS); + } + + private void writePartialBlocks(int[] blockLengths) throws Exception { + final FSDataOutputStream out = dfs.create(p); + final DFSStripedOutputStream stripedOut + = (DFSStripedOutputStream) out.getWrappedStream(); + int length = (STRIPES_PER_BLOCK - 1) * STRIPE_SIZE; + int[] posToKill = getPosToKill(blockLengths); + int checkingPos = nextCheckingPos(posToKill, 0); + try { + for (int pos = 0; pos < length; pos++) { + out.write(StripedFileTestUtil.getByte(pos)); + if (pos == checkingPos) { + for (int index : getIndexToStop(posToKill, pos)) { + out.flush(); + stripedOut.enqueueAllCurrentPackets(); + StripedDataStreamer s = stripedOut.getStripedDataStreamer(index); + waitStreamerAllAcked(s); + waitByteSent(s, blockLengths[index]); + stopBlockStream(s); + } + checkingPos = nextCheckingPos(posToKill, pos); + } + } + } finally { + DFSTestUtil.abortStream(stripedOut); + } + } + + private int nextCheckingPos(int[] posToKill, int curPos) { + int checkingPos = Integer.MAX_VALUE; + for (int i = 0; i < posToKill.length; i++) { + if (posToKill[i] > curPos) { + checkingPos = Math.min(checkingPos, posToKill[i]); + } + } + return checkingPos; + } + + private int[] getPosToKill(int[] blockLengths) { + int[] posToKill = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS]; + for (int i = 0; i < NUM_DATA_BLOCKS; i++) { + int numStripe = (blockLengths[i] - 1) / CELL_SIZE; + posToKill[i] = numStripe * STRIPE_SIZE + + i * CELL_SIZE + blockLengths[i] % CELL_SIZE; + if (blockLengths[i] % CELL_SIZE == 0) { + posToKill[i] += CELL_SIZE; + } + } + for (int i = NUM_DATA_BLOCKS; i < NUM_DATA_BLOCKS+NUM_PARITY_BLOCKS; i++) { + Preconditions.checkArgument(blockLengths[i] % CELL_SIZE == 0); + int numStripe = (blockLengths[i]) / CELL_SIZE; + posToKill[i] = numStripe * STRIPE_SIZE; + } + return posToKill; + } + + private List<Integer> getIndexToStop(int[] posToKill, int pos){ + List<Integer> indices=new LinkedList<>(); + for(int i=0;i<posToKill.length;i++){ + if(pos==posToKill[i]){ + indices.add(i); + } + } + return indices; + } + + private void waitByteSent(final StripedDataStreamer s, final long byteSent) + throws Exception { + try { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + return s.bytesSent >= byteSent; + } + }, 100, 3000); + } catch (TimeoutException e) { + throw new IOException("Timeout waiting for streamer " + s +". Sent=" + + s.bytesSent + ", expected="+byteSent); + } + } + + private void stopBlockStream(StripedDataStreamer s) throws Exception { + IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); + Whitebox.setInternalState(s, "blockStream", + new DataOutputStream(nullOutputStream)); + } + + private void recoverLease() throws Exception { + final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(conf); + try { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + return dfs2.recoverLease(p); + } catch (IOException e) { + return false; + } + } + }, 5000, 24000); + } catch (TimeoutException e) { + throw new IOException("Timeout waiting for recoverLease()"); + } + } + + private FileSystem getFSAsAnotherUser(final Configuration c) + throws IOException, InterruptedException { + return FileSystem.get(FileSystem.getDefaultUri(c), c, + UserGroupInformation.createUserForTesting(fakeUsername, + new String[]{fakeGroup}).getUserName()); + } + + public static void waitStreamerAllAcked(DataStreamer s) throws IOException { + long toWaitFor = s.getLastQueuedSeqno(); + s.waitForAckedSeqno(toWaitFor); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 4e9b4f4..82f5423 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -39,7 +39,9 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,6 +63,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; @@ -68,8 +71,10 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -93,6 +98,8 @@ import org.mockito.stubbing.Answer; import com.google.common.base.Supplier; +import static org.apache.hadoop.hdfs.TestLeaseRecoveryStriped.BLOCK_LENGTHS_SUITE; + /** * This tests if sync all replicas in block recovery works correctly */ @@ -243,8 +250,7 @@ public class TestBlockRecovery { DatanodeInfo[] locs = new DatanodeInfo[]{ mock(DatanodeInfo.class), mock(DatanodeInfo.class)}; - RecoveringBlock rBlock = new RecoveringBlock(block, - locs, RECOVERY_ID); + RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2); BlockRecord record1 = new BlockRecord( DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1); @@ -745,4 +751,28 @@ public class TestBlockRecovery { assertTrue(exceptionThrown); } } + + @Test + public void testSafeLength() throws Exception { + ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager + .getSystemDefaultPolicy(); + RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock, + new int[9], ecPolicy); + BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn); + BlockRecoveryWorker.RecoveryTaskStriped recoveryTask = + recoveryWorker.new RecoveryTaskStriped(rBlockStriped); + + for (int i = 0; i < BLOCK_LENGTHS_SUITE.length; i++) { + int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0]; + int safeLength = BLOCK_LENGTHS_SUITE[i][1][0]; + Map<Long, BlockRecord> syncList = new HashMap<>(); + for (int id = 0; id < blockLengths.length; id++) { + ReplicaRecoveryInfo rInfo = new ReplicaRecoveryInfo(id, + blockLengths[id], 0, null); + syncList.put((long) id, new BlockRecord(null, null, rInfo)); + } + Assert.assertEquals("BLOCK_LENGTHS_SUITE[" + i + "]", safeLength, + recoveryTask.getSafeLength(syncList)); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java index d4c5924..0460ad1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -199,14 +199,15 @@ public class TestCommitBlockSynchronization { FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); DatanodeID[] newTargets = new DatanodeID[]{ new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)}; + String[] storageIDs = new String[]{"fake-storage-ID"}; ExtendedBlock lastBlock = new ExtendedBlock(); namesystemSpy.commitBlockSynchronization( lastBlock, genStamp, length, true, - false, newTargets, null); + false, newTargets, storageIDs); // Repeat the call to make sure it returns true namesystemSpy.commitBlockSynchronization( - lastBlock, genStamp, length, true, false, newTargets, null); + lastBlock, genStamp, length, true, false, newTargets, storageIDs); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java index 101601e..3a5c135 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -46,7 +46,6 @@ import java.util.List; import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS; import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS; -import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -183,7 +182,7 @@ public class TestRecoverStripedBlocks { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, StripedFileTestUtil.blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2) .build(); try { @@ -191,7 +190,7 @@ public class TestRecoverStripedBlocks { DistributedFileSystem fs = cluster.getFileSystem(); BlockManager bm = cluster.getNamesystem().getBlockManager(); fs.getClient().setErasureCodingPolicy("/", null); - int fileLen = NUM_DATA_BLOCKS * blockSize; + int fileLen = NUM_DATA_BLOCKS * StripedFileTestUtil.blockSize; Path p = new Path("/test2RecoveryTasksForSameBlockGroup"); final byte[] data = new byte[fileLen]; DFSTestUtil.writeFile(fs, p, data);