HDFS-6833. DirectoryScanner should not register a deleting block with memory of DataNode. Contributed by Shinichi Yamashita
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6dae6d12 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6dae6d12 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6dae6d12 Branch: refs/heads/HDFS-7836 Commit: 6dae6d12ec5abb716e1501cd4e18b10ae7809b94 Parents: 06ce1d9 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Fri Mar 13 02:25:32 2015 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Fri Mar 13 02:25:32 2015 +0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/DirectoryScanner.java | 20 ++++--- .../server/datanode/fsdataset/FsDatasetSpi.java | 5 ++ .../impl/FsDatasetAsyncDiskService.java | 31 +++++++++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 41 +++++++++++++- .../server/datanode/SimulatedFSDataset.java | 5 ++ .../extdataset/ExternalDatasetImpl.java | 5 ++ .../fsdataset/impl/TestFsDatasetImpl.java | 59 ++++++++++++++++++++ 8 files changed, 158 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 07213dd..e52b849 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1139,6 +1139,9 @@ Release 2.7.0 - UNRELEASED HDFS-7830. DataNode does not release the volume lock when adding a volume fails. (Lei Xu via Colin P. Mccabe) + HDFS-6833. DirectoryScanner should not register a deleting block with + memory of DataNode. (Shinichi Yamashita via szetszwo) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 01f967f..61dfb14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -443,13 +443,14 @@ public class DirectoryScanner implements Runnable { int d = 0; // index for blockpoolReport int m = 0; // index for memReprot while (m < memReport.length && d < blockpoolReport.length) { - FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)]; - ScanInfo info = blockpoolReport[Math.min( - d, blockpoolReport.length - 1)]; + FinalizedReplica memBlock = memReport[m]; + ScanInfo info = blockpoolReport[d]; if (info.getBlockId() < memBlock.getBlockId()) { - // Block is missing in memory - statsRecord.missingMemoryBlocks++; - addDifference(diffRecord, statsRecord, info); + if (!dataset.isDeletingBlock(bpid, info.getBlockId())) { + // Block is missing in memory + statsRecord.missingMemoryBlocks++; + addDifference(diffRecord, statsRecord, info); + } d++; continue; } @@ -495,8 +496,11 @@ public class DirectoryScanner implements Runnable { current.getBlockId(), current.getVolume()); } while (d < blockpoolReport.length) { - statsRecord.missingMemoryBlocks++; - addDifference(diffRecord, statsRecord, blockpoolReport[d++]); + if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) { + statsRecord.missingMemoryBlocks++; + addDifference(diffRecord, statsRecord, blockpoolReport[d]); + } + d++; } LOG.info(statsRecord.toString()); } //end for http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 10c8369..5b183e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -543,4 +543,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * Check whether the block was pinned */ public boolean getPinning(ExtendedBlock block) throws IOException; + + /** + * Confirm whether the block is deleting + */ + public boolean isDeletingBlock(String bpid, long blockId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 13e854f..c1d3990 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -22,7 +22,10 @@ import java.io.File; import java.io.FileDescriptor; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -64,9 +67,14 @@ class FsDatasetAsyncDiskService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final FsDatasetImpl fsdatasetImpl; private final ThreadGroup threadGroup; private Map<File, ThreadPoolExecutor> executors = new HashMap<File, ThreadPoolExecutor>(); + private Map<String, Set<Long>> deletedBlockIds + = new HashMap<String, Set<Long>>(); + private static final int MAX_DELETED_BLOCKS = 64; + private int numDeletedBlocks = 0; /** * Create a AsyncDiskServices with a set of volumes (specified by their @@ -75,8 +83,9 @@ class FsDatasetAsyncDiskService { * The AsyncDiskServices uses one ThreadPool per volume to do the async * disk operations. */ - FsDatasetAsyncDiskService(DataNode datanode) { + FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) { this.datanode = datanode; + this.fsdatasetImpl = fsdatasetImpl; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } @@ -286,7 +295,27 @@ class FsDatasetAsyncDiskService { LOG.info("Deleted " + block.getBlockPoolId() + " " + block.getLocalBlock() + " file " + blockFile); } + updateDeletedBlockId(block); IOUtils.cleanup(null, volumeRef); } } + + private synchronized void updateDeletedBlockId(ExtendedBlock block) { + Set<Long> blockIds = deletedBlockIds.get(block.getBlockPoolId()); + if (blockIds == null) { + blockIds = new HashSet<Long>(); + deletedBlockIds.put(block.getBlockPoolId(), blockIds); + } + blockIds.add(block.getBlockId()); + numDeletedBlocks++; + if (numDeletedBlocks == MAX_DELETED_BLOCKS) { + for (Entry<String, Set<Long>> e : deletedBlockIds.entrySet()) { + String bpid = e.getKey(); + Set<Long> bs = e.getValue(); + fsdatasetImpl.removeDeletedBlocks(bpid, bs); + bs.clear(); + } + numDeletedBlocks = 0; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 0f28aa4..48ac6ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -237,6 +237,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { private volatile boolean fsRunning; final ReplicaMap volumeMap; + final Map<String, Set<Long>> deletingBlock; final RamDiskReplicaTracker ramDiskReplicaTracker; final RamDiskAsyncLazyPersistService asyncLazyPersistService; @@ -298,8 +299,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); - asyncDiskService = new FsDatasetAsyncDiskService(datanode); + asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); + deletingBlock = new HashMap<String, Set<Long>>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); @@ -1795,7 +1797,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { + ". Parent not found for file " + f); continue; } - volumeMap.remove(bpid, invalidBlks[i]); + ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]); + addDeletingBlock(bpid, removing.getBlockId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Block file " + removing.getBlockFile().getName() + + " is to be deleted"); + } } if (v.isTransientStorage()) { @@ -3005,5 +3012,35 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath())); return fss.getPermission().getStickyBit(); } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + synchronized(deletingBlock) { + Set<Long> s = deletingBlock.get(bpid); + return s != null ? s.contains(blockId) : false; + } + } + + public void removeDeletedBlocks(String bpid, Set<Long> blockIds) { + synchronized (deletingBlock) { + Set<Long> s = deletingBlock.get(bpid); + if (s != null) { + for (Long id : blockIds) { + s.remove(id); + } + } + } + } + + private void addDeletingBlock(String bpid, Long blockId) { + synchronized(deletingBlock) { + Set<Long> s = deletingBlock.get(bpid); + if (s == null) { + s = new HashSet<Long>(); + deletingBlock.put(bpid, s); + } + s.add(blockId); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 8ae5415..f0dbd0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1318,5 +1318,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { public boolean getPinning(ExtendedBlock b) throws IOException { return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned; } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index a3c9935..6653cca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -429,4 +429,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { public boolean getPinning(ExtendedBlock block) throws IOException { return false; } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 3b47dd0..403cb2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; @@ -29,8 +33,11 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; @@ -88,6 +95,8 @@ public class TestFsDatasetImpl { private DataNode datanode; private DataStorage storage; private FsDatasetImpl dataset; + + private final static String BLOCKPOOL = "BP-TEST"; private static Storage.StorageDirectory createStorageDirectory(File root) { Storage.StorageDirectory sd = new Storage.StorageDirectory(root); @@ -334,4 +343,54 @@ public class TestFsDatasetImpl { FsDatasetTestUtil.assertFileLockReleased(badDir.toString()); } + + @Test + public void testDeletingBlocks() throws IOException { + MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + + FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + FsVolumeImpl vol = ds.getVolumes().get(0); + + ExtendedBlock eb; + ReplicaInfo info; + List<Block> blockList = new ArrayList<Block>(); + for (int i = 1; i <= 63; i++) { + eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i); + info = new FinalizedReplica( + eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); + ds.volumeMap.add(BLOCKPOOL, info); + info.getBlockFile().createNewFile(); + info.getMetaFile().createNewFile(); + blockList.add(info); + } + ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Nothing to do + } + assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); + + blockList.clear(); + eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064); + info = new FinalizedReplica( + eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); + ds.volumeMap.add(BLOCKPOOL, info); + info.getBlockFile().createNewFile(); + info.getMetaFile().createNewFile(); + blockList.add(info); + ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Nothing to do + } + assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); + } finally { + cluster.shutdown(); + } + } }