Repository: hadoop Updated Branches: refs/heads/branch-2.8 993ab15b3 -> d1e6b6db7
HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned while dead. Contributed by Kihwal Lee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1e6b6db Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1e6b6db Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1e6b6db Branch: refs/heads/branch-2.8 Commit: d1e6b6db7cb60216ec3213e10020abf43174bac3 Parents: 993ab15 Author: Kihwal Lee <kih...@apache.org> Authored: Mon May 1 14:28:27 2017 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Mon May 1 14:28:27 2017 -0500 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 21 +++- .../blockmanagement/UnderReplicatedBlocks.java | 6 +- .../namenode/TestDecommissioningStatus.java | 111 ++++++++++++++++++- 3 files changed, 130 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1e6b6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e792caf..6ff8a62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1766,7 +1766,8 @@ public class BlockManager implements BlockStatsMXBean { * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. - * We do not use already decommissioned nodes as a source. + * We do not use already decommissioned nodes as a source, unless there no + * other choice. * Otherwise we choose a random node among those that did not reach their * replication limits. However, if the replication is of the highest priority * and all nodes have reached their replication limits, we will choose a @@ -1798,6 +1799,7 @@ public class BlockManager implements BlockStatsMXBean { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; + DatanodeDescriptor decommissionedSrc = null; int live = 0; int readonly = 0; int decommissioned = 0; @@ -1844,9 +1846,16 @@ public class BlockManager implements BlockStatsMXBean { // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; - // never use already decommissioned nodes - if(node.isDecommissioned()) + // Save the live decommissioned replica in case we need it. Such replicas + // are normally not used for replication, but if nothing else is + // available, one can be selected as a source. + if (node.isDecommissioned()) { + if (decommissionedSrc == null || + ThreadLocalRandom.current().nextBoolean()) { + decommissionedSrc = node; + } continue; + } // We got this far, current node is a reasonable choice if (srcNode == null) { @@ -1862,6 +1871,10 @@ public class BlockManager implements BlockStatsMXBean { if(numReplicas != null) numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, excess, 0); + // Pick a live decommissioned replica, if nothing else is available. + if (live == 0 && srcNode == null && decommissionedSrc != null) { + return decommissionedSrc; + } return srcNode; } @@ -2677,7 +2690,7 @@ public class BlockManager implements BlockStatsMXBean { int curReplicaDelta; if (result == AddBlockResult.ADDED) { - curReplicaDelta = 1; + curReplicaDelta = (node.isDecommissioned()) ? 0 : 1; if (logEveryBlock) { blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})", node, storedBlock, storedBlock.getNumBytes()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1e6b6db/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index bc2fe7a..cd559a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -310,9 +310,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { " curPri " + curPri + " oldPri " + oldPri); } - if(oldPri != curPri) { - remove(block, oldPri); - } + // oldPri is mostly correct, but not always. If not found with oldPri, + // other levels will be searched until the block is found & removed. + remove(block, oldPri); if(priorityQueues.get(curPri).add(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1e6b6db/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 02b5ebb..510e0ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -75,6 +76,7 @@ public class TestDecommissioningStatus { private static FileSystem localFileSys; private static Configuration conf; private static Path dir; + private static Logger LOG; final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes); @@ -98,7 +100,7 @@ public class TestDecommissioningStatus { conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1); @@ -111,6 +113,7 @@ public class TestDecommissioningStatus { cluster.getNamesystem().getBlockManager().getDatanodeManager() .setHeartbeatExpireInterval(3000); Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG); + LOG = Logger.getLogger(TestDecommissioningStatus.class); } @After @@ -431,4 +434,110 @@ public class TestDecommissioningStatus { writeConfigFile(localFileSys, excludeFile, null); dm.refreshNodes(conf); } + + @Test(timeout=120000) + public void testDecommissionLosingData() throws Exception { + ArrayList<String> nodes = new ArrayList<String>(2); + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + DatanodeManager dm = bm.getDatanodeManager(); + Path file1 = new Path("decommissionLosingData.dat"); + writeFile(fileSys, file1, (short)numDatanodes); + Thread.sleep(1000); + + // Shutdown dn1 + LOG.info("Shutdown dn1"); + DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId(); + String dnName = dnID.getXferAddr(); + DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID); + nodes.add(dnName); + DataNodeProperties stoppedDN1 = cluster.stopDataNode(1); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Shutdown dn0 + LOG.info("Shutdown dn0"); + dnID = cluster.getDataNodes().get(0).getDatanodeId(); + dnName = dnID.getXferAddr(); + DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID); + nodes.add(dnName); + DataNodeProperties stoppedDN0 = cluster.stopDataNode(0); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Decommission the nodes. + LOG.info("Decommissioning nodes"); + writeConfigFile(localFileSys, excludeFile, nodes); + dm.refreshNodes(conf); + BlockManagerTestUtil.recheckDecommissionState(dm); + assertTrue(dnDescriptor0.isDecommissioned()); + assertTrue(dnDescriptor1.isDecommissioned()); + + // All nodes are dead and decommed. Blocks should be missing. + long missingBlocks = bm.getMissingBlocksCount(); + long underreplicated = bm.getUnderReplicatedBlocksCount(); + assertTrue(missingBlocks > 0); + assertTrue(underreplicated > 0); + + // Bring back dn0 + LOG.info("Bring back dn0"); + cluster.restartDataNode(stoppedDN0, true); + do { + dnID = cluster.getDataNodes().get(0).getDatanodeId(); + } while (dnID == null); + dnDescriptor0 = dm.getDatanode(dnID); + // Wait until it sends a block report. + while (dnDescriptor0.numBlocks() == 0) { + Thread.sleep(100); + } + + // Bring back dn1 + LOG.info("Bring back dn1"); + cluster.restartDataNode(stoppedDN1, true); + do { + dnID = cluster.getDataNodes().get(1).getDatanodeId(); + } while (dnID == null); + dnDescriptor1 = dm.getDatanode(dnID); + // Wait until it sends a block report. + while (dnDescriptor1.numBlocks() == 0) { + Thread.sleep(100); + } + + // Blocks should be still be under-replicated + Thread.sleep(2000); // Let replication monitor run + assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount()); + + // Start up a node. + LOG.info("Starting two more nodes"); + cluster.startDataNodes(conf, 2, true, null, null); + cluster.waitActive(); + // Replication should fix it. + int count = 0; + while((bm.getUnderReplicatedBlocksCount() > 0 || + bm.getPendingReplicationBlocksCount() > 0) && + count++ < 10) { + Thread.sleep(1000); + } + + assertEquals(0, bm.getUnderReplicatedBlocksCount()); + assertEquals(0, bm.getPendingReplicationBlocksCount()); + assertEquals(0, bm.getMissingBlocksCount()); + + // Shutdown the extra nodes. + dnID = cluster.getDataNodes().get(3).getDatanodeId(); + cluster.stopDataNode(3); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + dnID = cluster.getDataNodes().get(2).getDatanodeId(); + cluster.stopDataNode(2); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Call refreshNodes on FSNamesystem with empty exclude file to remove the + // datanode from decommissioning list and make it available again. + writeConfigFile(localFileSys, excludeFile, null); + dm.refreshNodes(conf); + fileSys.delete(file1, false); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org