HDFS-8646. Prune cached replicas from DatanodeDescriptor state on replica invalidation.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b00c684 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b00c684 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b00c684 Branch: refs/heads/YARN-2928 Commit: 6b00c684bc7cfe446f6062af796d7b8334f47a8d Parents: e4b8017 Author: Andrew Wang <[email protected]> Authored: Wed Jun 24 14:42:33 2015 -0700 Committer: Zhijie Shen <[email protected]> Committed: Mon Jun 29 10:28:25 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../server/blockmanagement/BlockManager.java | 14 +++++++++++ .../hdfs/server/datanode/BPServiceActor.java | 6 +++-- .../hadoop/hdfs/server/datanode/DataNode.java | 17 +++++++++++-- .../hdfs/server/namenode/CacheManager.java | 24 ++++++++++++++++--- .../hdfs/server/namenode/FSNamesystem.java | 1 + .../hadoop/hdfs/server/namenode/Namesystem.java | 18 +++++++------- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 18 ++++++++++++++ .../hdfs/server/datanode/DataNodeTestUtils.java | 11 +++++++++ .../server/namenode/TestCacheDirectives.java | 25 ++++++++++++++++++++ 10 files changed, 122 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d94a213..4268154 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -949,6 +949,9 @@ Release 2.8.0 - UNRELEASED HDFS-8542. WebHDFS getHomeDirectory behavior does not match specification. (Kanaka Kumar Avvaru via jghoman) + HDFS-8546. Prune cached replicas from DatanodeDescriptor state on replica + invalidation. (wang) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7d3a678..368d3b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -3108,6 +3109,19 @@ public class BlockManager { return; } + CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() + .get(new CachedBlock(block.getBlockId(), (short) 0, false)); + if (cblock != null) { + boolean removed = false; + removed |= node.getPendingCached().remove(cblock); + removed |= node.getCached().remove(cblock); + removed |= node.getPendingUncached().remove(cblock); + if (removed) { + blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " + + "related lists on node {}", block, node); + } + } + // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f84dd99..1817427 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -709,8 +709,10 @@ class BPServiceActor implements Runnable { } processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); - DatanodeCommand cmd = cacheReport(); - processCommand(new DatanodeCommand[]{ cmd }); + if (!dn.areCacheReportsDisabledForTests()) { + DatanodeCommand cmd = cacheReport(); + processCommand(new DatanodeCommand[]{ cmd }); + } // // There is no work to do; sleep until hearbeat timer elapses, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 6c8cf2b..e265dad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -301,6 +301,7 @@ public class DataNode extends ReconfigurableBase ThreadGroup threadGroup = null; private DNConf dnConf; private volatile boolean heartbeatsDisabledForTests = false; + private volatile boolean cacheReportsDisabledForTests = false; private DataStorage storage = null; private DatanodeHttpServer httpServer = null; @@ -1055,15 +1056,27 @@ public class DataNode extends ReconfigurableBase // used only for testing + @VisibleForTesting void setHeartbeatsDisabledForTests( boolean heartbeatsDisabledForTests) { this.heartbeatsDisabledForTests = heartbeatsDisabledForTests; } - + + @VisibleForTesting boolean areHeartbeatsDisabledForTests() { return this.heartbeatsDisabledForTests; } - + + @VisibleForTesting + void setCacheReportsDisabledForTest(boolean disabled) { + this.cacheReportsDisabledForTests = disabled; + } + + @VisibleForTesting + boolean areCacheReportsDisabledForTests() { + return this.cacheReportsDisabledForTests; + } + /** * This method starts the data node with the specified conf. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index e5270ad..e09ba32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; @@ -902,9 +903,26 @@ public final class CacheManager { if (cachedBlock == null) { return; } - List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED); - for (DatanodeDescriptor datanode : datanodes) { - block.addCachedLoc(datanode); + List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED); + for (DatanodeDescriptor datanode : cachedDNs) { + // Filter out cached blocks that do not have a backing replica. + // + // This should not happen since it means the CacheManager thinks + // something is cached that does not exist, but it's a safety + // measure. + boolean found = false; + for (DatanodeInfo loc : block.getLocations()) { + if (loc.equals(datanode)) { + block.addCachedLoc(loc); + found = true; + break; + } + } + if (!found) { + LOG.warn("Datanode {} is not a valid cache location for block {} " + + "because that node does not have a backing replica!", + datanode, block.getBlock().getBlockName()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d82da93..b073a89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6460,6 +6460,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, this.dir = dir; } /** @return the cache manager. */ + @Override public CacheManager getCacheManager() { return cacheManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index 40c4765..1732865 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -29,21 +29,23 @@ import org.apache.hadoop.security.AccessControlException; @InterfaceAudience.Private public interface Namesystem extends RwLock, SafeMode { /** Is this name system running? */ - public boolean isRunning(); + boolean isRunning(); /** Check if the user has superuser privilege. */ - public void checkSuperuserPrivilege() throws AccessControlException; + void checkSuperuserPrivilege() throws AccessControlException; /** @return the block pool ID */ - public String getBlockPoolId(); + String getBlockPoolId(); - public boolean isInStandbyState(); + boolean isInStandbyState(); - public boolean isGenStampInFuture(Block block); + boolean isGenStampInFuture(Block block); - public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); + void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal); - public void checkOperation(OperationCategory read) throws StandbyException; + void checkOperation(OperationCategory read) throws StandbyException; - public boolean isInSnapshot(BlockInfoUnderConstruction blockUC); + boolean isInSnapshot(BlockInfoUnderConstruction blockUC); + + CacheManager getCacheManager(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d06b024..96fb669 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -79,6 +79,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FsShell; @@ -526,6 +527,23 @@ public class DFSTestUtil { } } + public static void waitForReplication(final DistributedFileSystem dfs, + final Path file, final short replication, int waitForMillis) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + FileStatus stat = dfs.getFileStatus(file); + return replication == stat.getReplication(); + } catch (IOException e) { + LOG.info("getFileStatus on path " + file + " failed!", e); + return false; + } + } + }, 100, waitForMillis); + } + /** * Keep accessing the given file until the namenode reports that the * given block in the file contains the given number of corrupt replicas. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 9dee724..2f9a3e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -53,6 +54,16 @@ public class DataNodeTestUtils { dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests); } + /** + * Set if cache reports are disabled for all DNs in a mini cluster. + */ + public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster, + boolean disabled) { + for (DataNode dn : cluster.getDataNodes()) { + dn.setCacheReportsDisabledForTest(disabled); + } + } + public static void triggerDeletionReport(DataNode dn) throws IOException { for (BPOfferService bpos : dn.getAllBpOs()) { bpos.triggerDeletionReportForTests(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b00c684/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 6027934..cf00405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; @@ -1510,4 +1511,28 @@ public class TestCacheDirectives { Thread.sleep(1000); checkPendingCachedEmpty(cluster); } + + @Test(timeout=60000) + public void testNoBackingReplica() throws Exception { + // Cache all three replicas for a file. + final Path filename = new Path("/noback"); + final short replication = (short) 3; + DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC); + dfs.addCachePool(new CachePoolInfo("pool")); + dfs.addCacheDirective( + new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename) + .setReplication(replication).build()); + waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1"); + // Pause cache reports while we change the replication factor. + // This will orphan some cached replicas. + DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true); + try { + dfs.setReplication(filename, (short) 1); + DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000); + // The cache locations should drop down to 1 even without cache reports. + waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2"); + } finally { + DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false); + } + } }
