HDFS-9958. BlockManager#createLocatedBlocks can throw NPE for corruptBlocks on failed storages. Contributed by Kuhu Shukla
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6243eabb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6243eabb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6243eabb Branch: refs/heads/YARN-3368 Commit: 6243eabb48390fffada2418ade5adf9e0766afbe Parents: cf2ee45 Author: Kihwal Lee <kih...@apache.org> Authored: Thu Apr 28 12:42:28 2016 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Thu Apr 28 12:44:53 2016 -0500 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 23 ++++-- .../apache/hadoop/hdfs/TestFileCorruption.java | 87 +++++++++++++++++++- 2 files changed, 103 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6243eabb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 70086e6..accfc38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1038,9 +1038,9 @@ public class BlockManager implements BlockStatsMXBean { } final int numNodes = blocksMap.numNodes(blk); - final boolean isCorrupt = numCorruptNodes != 0 && - numCorruptNodes == numNodes; - final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; + final boolean isCorrupt = numCorruptReplicas != 0 && + numCorruptReplicas == numNodes; + final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null; int j = 0, i = 0; @@ -1366,11 +1366,22 @@ public class BlockManager implements BlockStatsMXBean { + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } - + DatanodeStorageInfo storage = null; + if (storageID != null) { + storage = node.getStorageInfo(storageID); + } + if (storage == null) { + storage = storedBlock.findStorageInfo(node); + } + + if (storage == null) { + blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}", + blk, dn); + return; + } markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock, blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), - storageID == null ? null : node.getStorageInfo(storageID), - node); + storage, node); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/6243eabb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index c1a7ebb..011baa1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -18,15 +18,22 @@ package org.apache.hadoop.hdfs; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; import java.io.FileOutputStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.Map; +import java.util.Random; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; @@ -36,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -167,7 +176,83 @@ public class TestFileCorruption { } } } - + + @Test + public void testCorruptionWithDiskFailure() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + FileSystem fs = cluster.getFileSystem(); + final Path FILE_PATH = new Path("/tmp.txt"); + final long FILE_LEN = 1L; + DFSTestUtil.createFile(fs, FILE_PATH, FILE_LEN, (short) 3, 1L); + + // get the block + final String bpid = cluster.getNamesystem().getBlockPoolId(); + File storageDir = cluster.getInstanceStorageDir(0, 0); + File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); + assertTrue("Data directory does not exist", dataDir.exists()); + ExtendedBlock blk = getFirstBlock(cluster.getDataNodes().get(0), bpid); + if (blk == null) { + blk = getFirstBlock(cluster.getDataNodes().get(0), bpid); + } + assertFalse("Data directory does not contain any blocks or there was an" + + " " + + "IO error", blk == null); + ArrayList<DataNode> datanodes = cluster.getDataNodes(); + assertEquals(datanodes.size(), 3); + FSNamesystem ns = cluster.getNamesystem(); + //fail the storage on that node which has the block + try { + ns.writeLock(); + updateAllStorages(bm); + } finally { + ns.writeUnlock(); + } + ns.writeLock(); + try { + markAllBlocksAsCorrupt(bm, blk); + } finally { + ns.writeUnlock(); + } + + // open the file + fs.open(FILE_PATH); + + //clean up + fs.delete(FILE_PATH, false); + } finally { + if (cluster != null) { cluster.shutdown(); } + } + + } + + private void markAllBlocksAsCorrupt(BlockManager bm, + ExtendedBlock blk) throws IOException { + for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) { + bm.findAndMarkBlockAsCorrupt( + blk, info.getDatanodeDescriptor(), info.getStorageID(), "STORAGE_ID"); + } + } + + private void updateAllStorages(BlockManager bm) { + for (DatanodeDescriptor dd : bm.getDatanodeManager().getDatanodes()) { + Set<DatanodeStorageInfo> setInfos = new HashSet<DatanodeStorageInfo>(); + DatanodeStorageInfo[] infos = dd.getStorageInfos(); + Random random = new Random(); + for (int i = 0; i < infos.length; i++) { + int blkId = random.nextInt(101); + DatanodeStorage storage = new DatanodeStorage(Integer.toString(blkId), + DatanodeStorage.State.FAILED, StorageType.DISK); + infos[i].updateFromStorage(storage); + setInfos.add(infos[i]); + } + } + } + private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) { Map<DatanodeStorage, BlockListAsLongs> blockReports = dn.getFSDataset().getBlockReports(bpid);