HDFS-8827. Erasure Coding: Fix NPE when NameNode processes over-replicated striped blocks. Contributed by Walter Su and Takuya Fukudome.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ece8fd3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ece8fd3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ece8fd3 Branch: refs/heads/HDFS-7285-REBASE Commit: 3ece8fd39d01d8adb22a3fd138ddea9798301b00 Parents: b4f83ac Author: Jing Zhao <[email protected]> Authored: Fri Aug 7 11:25:20 2015 -0700 Committer: Vinayakumar B <[email protected]> Committed: Thu Aug 13 17:32:39 2015 +0530 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../server/blockmanagement/BlockManager.java | 22 ++- .../TestAddOverReplicatedStripedBlocks.java | 152 ++++++++++++++++++- 3 files changed, 168 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ece8fd3/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 5ad084b..45afd2c 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -391,3 +391,6 @@ HDFS-8857. Erasure Coding: Fix ArrayIndexOutOfBoundsException in TestWriteStripedFileWithFailure. (Li Bo) + + HDFS-8827. Erasure Coding: Fix NPE when NameNode processes over-replicated + striped blocks. (Walter Su and Takuya Fukudome via jing9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ece8fd3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 4a4331f..28cf383 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3181,14 +3181,13 @@ public class BlockManager implements BlockStatsMXBean { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(storedBlock); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( - bc.getStoragePolicyID()); - final List<StorageType> excessTypes = storagePolicy.chooseExcess( - replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); if (storedBlock.isStriped()) { - chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint, - excessTypes); + chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint); } else { + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); + final List<StorageType> excessTypes = storagePolicy.chooseExcess( + replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); chooseExcessReplicasContiguous(bc, nonExcess, storedBlock, replication, addedNode, delNodeHint, excessTypes); } @@ -3262,8 +3261,7 @@ public class BlockManager implements BlockStatsMXBean { private void chooseExcessReplicasStriped(BlockCollection bc, final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock, - DatanodeDescriptor delNodeHint, - List<StorageType> excessTypes) { + DatanodeDescriptor delNodeHint) { assert storedBlock instanceof BlockInfoStriped; BlockInfoStriped sblk = (BlockInfoStriped) storedBlock; short groupSize = sblk.getTotalBlockNum(); @@ -3283,6 +3281,14 @@ public class BlockManager implements BlockStatsMXBean { found.set(index); storage2index.put(storage, index); } + // the number of target left replicas equals to the of number of the found + // indices. + int numOfTarget = found.cardinality(); + + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); + final List<StorageType> excessTypes = storagePolicy.chooseExcess( + (short)numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess)); // use delHint only if delHint is duplicated final DatanodeStorageInfo delStorageHint = http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ece8fd3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java index eaf3435..337911d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java @@ -24,9 +24,14 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.junit.After; import org.junit.Before; @@ -35,6 +40,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -49,7 +55,7 @@ public class TestAddOverReplicatedStripedBlocks { private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; private final short GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - private final int NUM_STRIPE_PER_BLOCK = 1; + private final int NUM_STRIPE_PER_BLOCK = 4; private final int BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; private final int numDNs = GROUP_SIZE + 3; @@ -57,6 +63,8 @@ public class TestAddOverReplicatedStripedBlocks { public void setup() throws IOException { Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + // disable block recovery + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); SimulatedFSDataset.setFactory(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.waitActive(); @@ -113,4 +121,146 @@ public class TestAddOverReplicatedStripedBlocks { filePath.toString(), 0, fileLen); DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); } + + @Test + public void testProcessOverReplicatedSBSmallerThanFullBlocks() + throws Exception { + // Create a EC file which doesn't fill full internal blocks. + int fileLen = CELLSIZE * (DATA_BLK_NUM - 1); + byte[] content = new byte[fileLen]; + DFSTestUtil.writeFile(fs, filePath, new String(content)); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + cluster.triggerBlockReports(); + List<DatanodeInfo> infos = Arrays.asList(bg.getLocations()); + + // let a internal block be over replicated with 2 redundant blocks. + // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks + + // 3 parity blocks + 2 redundant blocks > GROUP_SIZE) + blk.setBlockId(groupId + 2); + List<DataNode> dataNodeList = cluster.getDataNodes(); + for (int i = 0; i < numDNs; i++) { + if (!infos.contains(dataNodeList.get(i).getDatanodeId())) { + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + System.out.println("XXX: inject block into datanode " + i); + } + } + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // verify that all internal blocks exists + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + } + + @Test + public void testProcessOverReplicatedAndCorruptStripedBlock() + throws Exception { + long fileLen = DATA_BLK_NUM * BLOCK_SIZE; + DFSTestUtil.createStripedFile(cluster, filePath, null, 1, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + BlockInfoStriped blockInfo = new BlockInfoStriped(blk, + ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE); + for (int i = 0; i < GROUP_SIZE; i++) { + blk.setBlockId(groupId + i); + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + } + cluster.triggerBlockReports(); + + // let a internal block be corrupt + BlockManager bm = cluster.getNamesystem().getBlockManager(); + List<DatanodeInfo> infos = Arrays.asList(bg.getLocations()); + List<String> storages = Arrays.asList(bg.getStorageIDs()); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(lbs.getLastLocatedBlock().getBlock(), + infos.get(0), storages.get(0), "TEST"); + } finally { + cluster.getNamesystem().writeUnlock(); + } + assertEquals(1, bm.countNodes(blockInfo).corruptReplicas()); + + // let a internal block be over replicated with 2 redundant block. + blk.setBlockId(groupId + 2); + cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); + cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // verify that all internal blocks exists + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + } + + @Test + public void testProcessOverReplicatedAndMissingStripedBlock() + throws Exception { + long fileLen = CELLSIZE * DATA_BLK_NUM; + DFSTestUtil.createStripedFile(cluster, filePath, null, 1, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + // only inject GROUP_SIZE - 1 blocks, so there is one block missing + for (int i = 0; i < GROUP_SIZE - 1; i++) { + blk.setBlockId(groupId + i); + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + } + cluster.triggerBlockReports(); + + // let a internal block be over replicated with 2 redundant blocks. + // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks + + // 3 parity blocks + 2 redundant blocks > GROUP_SIZE) + blk.setBlockId(groupId + 2); + cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); + cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // Since one block is missing, when over-replicated blocks got deleted, + // we are left GROUP_SIZE - 1 blocks. + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + } + }
