HDFS-7225. Remove stale block invalidation work when DN re-registers with different UUID. (Zhe Zhang and Andrew Wang)
(cherry picked from commit 406c09ad1150c4971c2b7675fcb0263d40517fbf) (cherry picked from commit 2e15754a92c6589308ccbbb646166353cc2f2456) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/014d07de Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/014d07de Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/014d07de Branch: refs/heads/sjlee/hdfs-merge Commit: 014d07de2e9b39be4b6793f0e09fcf8548570ad5 Parents: d79a584 Author: Andrew Wang <w...@apache.org> Authored: Tue Nov 18 22:14:04 2014 -0800 Committer: Sangjin Lee <sj...@apache.org> Committed: Wed Aug 12 21:32:30 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 21 ++- .../server/blockmanagement/DatanodeManager.java | 2 + .../TestComputeInvalidateWork.java | 167 +++++++++++++++---- 4 files changed, 156 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 47ec910..cc4d2ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -35,6 +35,9 @@ Release 2.6.1 - UNRELEASED HDFS-8486. DN startup may cause severe data loss. (daryn via cmccabe) + HDFS-7225. Remove stale block invalidation work when DN re-registers with + different UUID. (Zhe Zhang and Andrew Wang) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 17112bf..d26cc52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1112,6 +1112,18 @@ public class BlockManager { } /** + * Remove all block invalidation tasks under this datanode UUID; + * used when a datanode registers with a new UUID and the old one + * is wiped. + */ + void removeFromInvalidates(final DatanodeInfo datanode) { + if (!namesystem.isPopulatingReplQueues()) { + return; + } + invalidateBlocks.remove(datanode); + } + + /** * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt * @param dn Datanode which holds the corrupt replica @@ -3395,7 +3407,14 @@ public class BlockManager { return 0; } try { - toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn)); + DatanodeDescriptor dnDescriptor = datanodeManager.getDatanode(dn); + if (dnDescriptor == null) { + LOG.warn("DataNode " + dn + " cannot be found with UUID " + + dn.getDatanodeUuid() + ", removing block invalidation work."); + invalidateBlocks.remove(dn); + return 0; + } + toInvalidate = invalidateBlocks.invalidateWork(dnDescriptor); if (toInvalidate == null) { return 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 6a52349..80965b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -593,6 +593,8 @@ public class DatanodeManager { synchronized (datanodeMap) { host2DatanodeMap.remove(datanodeMap.remove(key)); } + // Also remove all block invalidation tasks under this node + blockManager.removeFromInvalidates(new DatanodeInfo(node)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".wipeDatanode(" + node + "): storage " + key http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java index d0edd48..fecca4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java @@ -17,66 +17,161 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.UUID; + import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.util.VersionInfo; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; /** * Test if FSNamesystem handles heartbeat right */ public class TestComputeInvalidateWork { + + private Configuration conf; + private final int NUM_OF_DATANODES = 3; + private MiniDFSCluster cluster; + private FSNamesystem namesystem; + private BlockManager bm; + private DatanodeDescriptor[] nodes; + + @Before + public void setup() throws Exception { + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES) + .build(); + cluster.waitActive(); + namesystem = cluster.getNamesystem(); + bm = namesystem.getBlockManager(); + nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes(); + assertEquals(nodes.length, NUM_OF_DATANODES); + } + + @After + public void teardown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + /** - * Test if {@link FSNamesystem#computeInvalidateWork(int)} + * Test if {@link BlockManager#computeInvalidateWork(int)} * can schedule invalidate work correctly */ - @Test + @Test(timeout=120000) public void testCompInvalidate() throws Exception { - final Configuration conf = new HdfsConfiguration(); - final int NUM_OF_DATANODES = 3; - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build(); + final int blockInvalidateLimit = bm.getDatanodeManager() + .blockInvalidateLimit; + namesystem.writeLock(); try { - cluster.waitActive(); - final FSNamesystem namesystem = cluster.getNamesystem(); - final BlockManager bm = namesystem.getBlockManager(); - final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit; - final DatanodeDescriptor[] nodes = bm.getDatanodeManager( - ).getHeartbeatManager().getDatanodes(); - assertEquals(nodes.length, NUM_OF_DATANODES); + for (int i=0; i<nodes.length; i++) { + for(int j=0; j<3*blockInvalidateLimit+1; j++) { + Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, + GenerationStamp.LAST_RESERVED_STAMP); + bm.addToInvalidates(block, nodes[i]); + } + } + assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, + bm.computeInvalidateWork(NUM_OF_DATANODES+1)); + assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, + bm.computeInvalidateWork(NUM_OF_DATANODES)); + assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), + bm.computeInvalidateWork(NUM_OF_DATANODES-1)); + int workCount = bm.computeInvalidateWork(1); + if (workCount == 1) { + assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2)); + } else { + assertEquals(workCount, blockInvalidateLimit); + assertEquals(2, bm.computeInvalidateWork(2)); + } + } finally { + namesystem.writeUnlock(); + } + } + + /** + * Reformatted DataNodes will replace the original UUID in the + * {@link DatanodeManager#datanodeMap}. This tests if block + * invalidation work on the original DataNode can be skipped. + */ + @Test(timeout=120000) + public void testDatanodeReformat() throws Exception { + namesystem.writeLock(); + try { + Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP); + bm.addToInvalidates(block, nodes[0]); + // Change the datanode UUID to emulate a reformation + nodes[0].setDatanodeUuidForTesting("fortesting"); + // Since UUID has changed, the invalidation work should be skipped + assertEquals(0, bm.computeInvalidateWork(1)); + assertEquals(0, bm.getPendingDeletionBlocksCount()); + } finally { + namesystem.writeUnlock(); + } + } + + @Test(timeout=12000) + public void testDatanodeReRegistration() throws Exception { + // Create a test file + final DistributedFileSystem dfs = cluster.getFileSystem(); + final Path path = new Path("/testRR"); + // Create a file and shutdown the DNs, which populates InvalidateBlocks + DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(), + (short) NUM_OF_DATANODES, 0xED0ED0); + for (DataNode dn : cluster.getDataNodes()) { + dn.shutdown(); + } + dfs.delete(path, false); + namesystem.writeLock(); + InvalidateBlocks invalidateBlocks; + int expected = NUM_OF_DATANODES; + try { + invalidateBlocks = (InvalidateBlocks) Whitebox + .getInternalState(cluster.getNamesystem().getBlockManager(), + "invalidateBlocks"); + assertEquals("Expected invalidate blocks to be the number of DNs", + (long) expected, invalidateBlocks.numBlocks()); + } finally { + namesystem.writeUnlock(); + } + // Re-register each DN and see that it wipes the invalidation work + for (DataNode dn : cluster.getDataNodes()) { + DatanodeID did = dn.getDatanodeId(); + did.setDatanodeUuidForTesting(UUID.randomUUID().toString()); + DatanodeRegistration reg = new DatanodeRegistration(did, + new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE), + new ExportedBlockKeys(), + VersionInfo.getVersion()); namesystem.writeLock(); try { - for (int i=0; i<nodes.length; i++) { - for(int j=0; j<3*blockInvalidateLimit+1; j++) { - Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, - GenerationStamp.LAST_RESERVED_STAMP); - bm.addToInvalidates(block, nodes[i]); - } - } - - assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, - bm.computeInvalidateWork(NUM_OF_DATANODES+1)); - assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, - bm.computeInvalidateWork(NUM_OF_DATANODES)); - assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), - bm.computeInvalidateWork(NUM_OF_DATANODES-1)); - int workCount = bm.computeInvalidateWork(1); - if (workCount == 1) { - assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2)); - } else { - assertEquals(workCount, blockInvalidateLimit); - assertEquals(2, bm.computeInvalidateWork(2)); - } + bm.getDatanodeManager().registerDatanode(reg); + expected--; + assertEquals("Expected number of invalidate blocks to decrease", + (long) expected, invalidateBlocks.numBlocks()); } finally { - namesystem.writeUnlock(); + namesystem.writeUnlock(); } - } finally { - cluster.shutdown(); } } }