Repository: hadoop Updated Branches: refs/heads/branch-2 5ca4f0f82 -> 6a6d0ece9
HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned while dead. Contributed by Kihwal Lee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a6d0ece Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a6d0ece Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a6d0ece Branch: refs/heads/branch-2 Commit: 6a6d0ece993d1c52fce15b91180ededcdbd2637c Parents: 5ca4f0f Author: Kihwal Lee <kih...@apache.org> Authored: Mon May 1 14:20:37 2017 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Mon May 1 14:20:37 2017 -0500 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 21 +++- .../blockmanagement/UnderReplicatedBlocks.java | 6 +- .../namenode/TestDecommissioningStatus.java | 111 ++++++++++++++++++- 3 files changed, 130 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6d0ece/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 0e5cfc9..31437b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1804,7 +1804,8 @@ public class BlockManager implements BlockStatsMXBean { * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. - * We do not use already decommissioned nodes as a source. + * We do not use already decommissioned nodes as a source, unless there is + * no other choice. * Otherwise we choose a random node among those that did not reach their * replication limits. However, if the replication is of the highest priority * and all nodes have reached their replication limits, we will choose a @@ -1836,6 +1837,7 @@ public class BlockManager implements BlockStatsMXBean { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; + DatanodeDescriptor decommissionedSrc = null; int live = 0; int readonly = 0; int decommissioned = 0; @@ -1889,9 +1891,16 @@ public class BlockManager implements BlockStatsMXBean { // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; - // never use already decommissioned nodes - if(node.isDecommissioned()) + // Save the live decommissioned replica in case we need it. Such replicas + // are normally not used for replication, but if nothing else is + // available, one can be selected as a source. + if (node.isDecommissioned()) { + if (decommissionedSrc == null || + ThreadLocalRandom.current().nextBoolean()) { + decommissionedSrc = node; + } continue; + } // Don't use dead ENTERING_MAINTENANCE or IN_MAINTENANCE nodes. if((!node.isAlive() && node.isEnteringMaintenance()) || node.isInMaintenance()) { @@ -1912,6 +1921,10 @@ public class BlockManager implements BlockStatsMXBean { if(numReplicas != null) numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, excess, 0, maintenanceNotForRead, maintenanceForRead); + // Pick a live decommissioned replica, if nothing else is available. + if (live == 0 && srcNode == null && decommissionedSrc != null) { + return decommissionedSrc; + } return srcNode; } @@ -2795,7 +2808,7 @@ public class BlockManager implements BlockStatsMXBean { int curReplicaDelta; if (result == AddBlockResult.ADDED) { - curReplicaDelta = 1; + curReplicaDelta = (node.isDecommissioned()) ? 0 : 1; if (logEveryBlock) { blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})", node, storedBlock, storedBlock.getNumBytes()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6d0ece/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index bc2fe7a..cd559a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -310,9 +310,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { " curPri " + curPri + " oldPri " + oldPri); } - if(oldPri != curPri) { - remove(block, oldPri); - } + // oldPri is mostly correct, but not always. If not found with oldPri, + // other levels will be searched until the block is found & removed. + remove(block, oldPri); if(priorityQueues.get(curPri).add(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6d0ece/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 0ca7412..5b61b0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -74,6 +75,7 @@ public class TestDecommissioningStatus { private static FileSystem fileSys; private static HostsFileWriter hostsFileWriter; private static Configuration conf; + private Logger LOG; final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes); @@ -91,7 +93,7 @@ public class TestDecommissioningStatus { conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1); @@ -101,6 +103,7 @@ public class TestDecommissioningStatus { cluster.getNamesystem().getBlockManager().getDatanodeManager() .setHeartbeatExpireInterval(3000); Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG); + LOG = Logger.getLogger(TestDecommissioningStatus.class); } @After @@ -386,4 +389,110 @@ public class TestDecommissioningStatus { hostsFileWriter.initExcludeHost(""); dm.refreshNodes(conf); } + + @Test(timeout=120000) + public void testDecommissionLosingData() throws Exception { + ArrayList<String> nodes = new ArrayList<String>(2); + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + DatanodeManager dm = bm.getDatanodeManager(); + Path file1 = new Path("decommissionLosingData.dat"); + writeFile(fileSys, file1, (short)numDatanodes); + Thread.sleep(1000); + + // Shutdown dn1 + LOG.info("Shutdown dn1"); + DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId(); + String dnName = dnID.getXferAddr(); + DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID); + nodes.add(dnName); + DataNodeProperties stoppedDN1 = cluster.stopDataNode(1); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Shutdown dn0 + LOG.info("Shutdown dn0"); + dnID = cluster.getDataNodes().get(0).getDatanodeId(); + dnName = dnID.getXferAddr(); + DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID); + nodes.add(dnName); + DataNodeProperties stoppedDN0 = cluster.stopDataNode(0); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Decommission the nodes. + LOG.info("Decommissioning nodes"); + hostsFileWriter.initExcludeHosts(nodes); + dm.refreshNodes(conf); + BlockManagerTestUtil.recheckDecommissionState(dm); + assertTrue(dnDescriptor0.isDecommissioned()); + assertTrue(dnDescriptor1.isDecommissioned()); + + // All nodes are dead and decommed. Blocks should be missing. + long missingBlocks = bm.getMissingBlocksCount(); + long underreplicated = bm.getUnderReplicatedBlocksCount(); + assertTrue(missingBlocks > 0); + assertTrue(underreplicated > 0); + + // Bring back dn0 + LOG.info("Bring back dn0"); + cluster.restartDataNode(stoppedDN0, true); + do { + dnID = cluster.getDataNodes().get(0).getDatanodeId(); + } while (dnID == null); + dnDescriptor0 = dm.getDatanode(dnID); + // Wait until it sends a block report. + while (dnDescriptor0.numBlocks() == 0) { + Thread.sleep(100); + } + + // Bring back dn1 + LOG.info("Bring back dn1"); + cluster.restartDataNode(stoppedDN1, true); + do { + dnID = cluster.getDataNodes().get(1).getDatanodeId(); + } while (dnID == null); + dnDescriptor1 = dm.getDatanode(dnID); + // Wait until it sends a block report. + while (dnDescriptor1.numBlocks() == 0) { + Thread.sleep(100); + } + + // Blocks should be still be under-replicated + Thread.sleep(2000); // Let replication monitor run + assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount()); + + // Start up a node. + LOG.info("Starting two more nodes"); + cluster.startDataNodes(conf, 2, true, null, null); + cluster.waitActive(); + // Replication should fix it. + int count = 0; + while((bm.getUnderReplicatedBlocksCount() > 0 || + bm.getPendingReplicationBlocksCount() > 0) && + count++ < 10) { + Thread.sleep(1000); + } + + assertEquals(0, bm.getUnderReplicatedBlocksCount()); + assertEquals(0, bm.getPendingReplicationBlocksCount()); + assertEquals(0, bm.getMissingBlocksCount()); + + // Shutdown the extra nodes. + dnID = cluster.getDataNodes().get(3).getDatanodeId(); + cluster.stopDataNode(3); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + dnID = cluster.getDataNodes().get(2).getDatanodeId(); + cluster.stopDataNode(2); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Call refreshNodes on FSNamesystem with empty exclude file to remove the + // datanode from decommissioning list and make it available again. + hostsFileWriter.initExcludeHost(""); + dm.refreshNodes(conf); + fileSys.delete(file1, false); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org