HDFS-6978. Directory scanner should correctly reconcile blocks on RAM disk.
(Arpit Agarwal)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/14edbc94
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/14edbc94
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/14edbc94
Branch: refs/heads/branch-2
Commit: 14edbc9419bb4cc588d90d218bfe6c2d61a4e957
Parents: 6906ecc
Author: arp <[email protected]>
Authored: Fri Sep 12 22:13:18 2014 -0700
Committer: Jitendra Pandey <[email protected]>
Committed: Fri Oct 17 13:42:01 2014 -0700
----------------------------------------------------------------------
.../hdfs/server/datanode/DirectoryScanner.java | 19 ++-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 9 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 57 ++++---
.../fsdataset/impl/LazyWriteReplicaTracker.java | 12 +-
.../server/datanode/TestDirectoryScanner.java | 171 +++++++++++++++++--
.../fsdataset/impl/FsDatasetTestUtil.java | 10 ++
.../fsdataset/impl/TestLazyPersistFiles.java | 14 +-
7 files changed, 241 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index c313b04..71f976b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -83,6 +83,7 @@ public class DirectoryScanner implements Runnable {
long missingBlockFile = 0;
long missingMemoryBlocks = 0;
long mismatchBlocks = 0;
+ long duplicateBlocks = 0;
public Stats(String bpid) {
this.bpid = bpid;
@@ -440,7 +441,7 @@ public class DirectoryScanner implements Runnable {
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) {
- Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+ FinalizedReplica memBlock = memReport[Math.min(m, memReport.length -
1)];
ScanInfo info = blockpoolReport[Math.min(
d, blockpoolReport.length - 1)];
if (info.getBlockId() < memBlock.getBlockId()) {
@@ -468,9 +469,23 @@ public class DirectoryScanner implements Runnable {
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
+ } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) !=
0) {
+ // volumeMap record and on-disk files don't match.
+ statsRecord.duplicateBlocks++;
+ addDifference(diffRecord, statsRecord, info);
}
d++;
- m++;
+
+ if (d < blockpoolReport.length) {
+ // There may be multiple on-disk records for the same block, don't
increment
+ // the memory record pointer if so.
+ ScanInfo nextInfo = blockpoolReport[Math.min(d,
blockpoolReport.length - 1)];
+ if (nextInfo.getBlockId() != info.blockId) {
+ ++m;
+ }
+ } else {
+ ++m;
+ }
}
while (m < memReport.length) {
FinalizedReplica current = memReport[m++];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index f5c6ed6..c370f3d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -269,15 +269,16 @@ class BlockPoolSlice {
/**
* Save the given replica to persistent storage.
*
- * @param replicaInfo
* @return The saved meta and block files, in that order.
* @throws IOException
*/
- File[] lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
+ File[] lazyPersistReplica(long blockId, long genStamp,
+ File srcMeta, File srcFile) throws IOException {
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
}
- File targetFiles[] = FsDatasetImpl.copyBlockFiles(replicaInfo,
lazypersistDir);
+ File targetFiles[] = FsDatasetImpl.copyBlockFiles(
+ blockId, genStamp, srcMeta, srcFile, lazypersistDir);
dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
return targetFiles;
}
@@ -510,7 +511,7 @@ class BlockPoolSlice {
* @return the replica that is retained.
* @throws IOException
*/
- private ReplicaInfo resolveDuplicateReplicas(
+ ReplicaInfo resolveDuplicateReplicas(
final ReplicaInfo replica1, final ReplicaInfo replica2,
final ReplicaMap volumeMap) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 44a91c1..e243852 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -703,17 +703,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
/**
- * Copy the block and meta files for the given block from the given
+ * Copy the block and meta files for the given block to the given
destination.
* @return the new meta and block files.
* @throws IOException
*/
- static File[] copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
+ static File[] copyBlockFiles(long blockId, long genStamp,
+ File srcMeta, File srcFile, File destRoot)
throws IOException {
- final File destDir = DatanodeUtil.idToBlockDir(destRoot,
replicaInfo.getBlockId());
- final File dstFile = new File(destDir, replicaInfo.getBlockName());
- final File dstMeta = FsDatasetUtil.getMetaFile(dstFile,
replicaInfo.getGenerationStamp());
- final File srcMeta = replicaInfo.getMetaFile();
- final File srcFile = replicaInfo.getBlockFile();
+ final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
+ final File dstFile = new File(destDir, srcFile.getName());
+ final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
try {
FileUtils.copyFile(srcMeta, dstMeta);
} catch (IOException e) {
@@ -1847,10 +1846,20 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
File memFile = memBlockInfo.getBlockFile();
if (memFile.exists()) {
if (memFile.compareTo(diskFile) != 0) {
- LOG.warn("Block file " + memFile.getAbsolutePath()
- + " does not match file found by scan "
- + diskFile.getAbsolutePath());
- // TODO: Should the diskFile be deleted?
+ if (diskMetaFile.exists()) {
+ if (memBlockInfo.getMetaFile().exists()) {
+ // We have two sets of block+meta files. Decide which one to
+ // keep.
+ ReplicaInfo diskBlockInfo = new FinalizedReplica(
+ blockId, diskFile.length(), diskGS, vol,
diskFile.getParentFile());
+ ((FsVolumeImpl)
vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
+ memBlockInfo, diskBlockInfo, volumeMap);
+ }
+ } else {
+ if (!diskFile.delete()) {
+ LOG.warn("Failed to delete " + diskFile + ". Will retry on next
scan");
+ }
+ }
}
} else {
// Block refers to a block file that does not exist.
@@ -2315,6 +2324,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
+ BlockPoolSlice bpSlice;
+ File srcFile, srcMeta;
+ long genStamp;
synchronized (FsDatasetImpl.this) {
replicaInfo = volumeMap.get(bpid, blockId);
@@ -2336,10 +2348,18 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
}
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId,
targetVolume);
- File[] savedFiles = targetVolume.getBlockPoolSlice(bpid)
- .lazyPersistReplica(replicaInfo);
- lazyWriteReplicaTracker.recordEndLazyPersist(
- bpid, blockId, savedFiles[0], savedFiles[1]);
+ bpSlice = targetVolume.getBlockPoolSlice(bpid);
+ srcMeta = replicaInfo.getMetaFile();
+ srcFile = replicaInfo.getBlockFile();
+ genStamp = replicaInfo.getGenerationStamp();
+ }
+
+ // Drop the FsDatasetImpl lock for the file copy.
+ File[] savedFiles =
+ bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
+
+ synchronized (FsDatasetImpl.this) {
+ lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId,
savedFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + ";
bpid=" + bpid +
@@ -2360,7 +2380,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
try {
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
if (replicaState != null) {
- // Move the replica outside the lock.
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
}
succeeded = true;
@@ -2459,9 +2478,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
// Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) {
((FsVolumeImpl)
replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
- }
- if (metaFile.delete() || !metaFile.exists()) {
- ((FsVolumeImpl)
replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+ if (metaFile.delete() || !metaFile.exists()) {
+ ((FsVolumeImpl)
replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+ }
}
// If deletion failed then the directory scanner will cleanup the
blocks
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
index a8ab1b9..e8d9c5c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -161,9 +161,13 @@ class LazyWriteReplicaTracker {
replicaState.lazyPersistVolume = checkpointVolume;
}
+ /**
+ * @param bpid
+ * @param blockId
+ * @param savedFiles The saved meta and block files, in that order.
+ */
synchronized void recordEndLazyPersist(
- final String bpid, final long blockId,
- final File savedMetaFile, final File savedBlockFile) {
+ final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
ReplicaState replicaState = map.get(blockId);
@@ -172,8 +176,8 @@ class LazyWriteReplicaTracker {
bpid + "; blockId=" + blockId);
}
replicaState.state = State.LAZY_PERSIST_COMPLETE;
- replicaState.savedMetaFile = savedMetaFile;
- replicaState.savedBlockFile = savedBlockFile;
+ replicaState.savedMetaFile = savedFiles[0];
+ replicaState.savedBlockFile = savedFiles[1];
if (replicasNotPersisted.peek() == replicaState) {
// Common case.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 78b9e2b..f27e117 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -31,22 +33,21 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
/**
@@ -60,22 +61,29 @@ public class TestDirectoryScanner {
private MiniDFSCluster cluster;
private String bpid;
+ private DFSClient client;
private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
private DirectoryScanner scanner = null;
private final Random rand = new Random();
private final Random r = new Random();
+ private static final int BLOCK_LENGTH = 100;
static {
- CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
+ CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
}
/** create a file with a length of <code>fileLen</code> */
- private void createFile(String fileName, long fileLen) throws IOException {
+ private List<LocatedBlock> createFile(String fileNamePrefix,
+ long fileLen,
+ boolean isLazyPersist) throws
IOException {
FileSystem fs = cluster.getFileSystem();
- Path filePath = new Path(fileName);
- DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
+ Path filePath = new Path("/" + fileNamePrefix + ".dat");
+ DFSTestUtil.createFile(
+ fs, filePath, isLazyPersist, 1024, fileLen,
+ BLOCK_LENGTH, (short) 1, r.nextLong(), false);
+ return client.getLocatedBlocks(filePath.toString(), 0,
fileLen).getLocatedBlocks();
}
/** Truncate a block file */
@@ -134,6 +142,43 @@ public class TestDirectoryScanner {
return 0;
}
+ /**
+ * Duplicate the given block on all volumes.
+ * @param blockId
+ * @throws IOException
+ */
+ private void duplicateBlock(long blockId) throws IOException {
+ synchronized (fds) {
+ ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+ for (FsVolumeSpi v : fds.getVolumes()) {
+ if (v.getStorageID().equals(b.getVolume().getStorageID())) {
+ continue;
+ }
+
+ // Volume without a copy of the block. Make a copy now.
+ File sourceBlock = b.getBlockFile();
+ File sourceMeta = b.getMetaFile();
+ String sourceRoot = b.getVolume().getBasePath();
+ String destRoot = v.getBasePath();
+
+ String relativeBlockPath = new
File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
+ String relativeMetaPath = new
File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();
+
+ File destBlock = new File(destRoot, relativeBlockPath);
+ File destMeta = new File(destRoot, relativeMetaPath);
+
+ destBlock.getParentFile().mkdirs();
+ FileUtils.copyFile(sourceBlock, destBlock);
+ FileUtils.copyFile(sourceMeta, destMeta);
+
+ if (destBlock.exists() && destMeta.exists()) {
+ LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
+ LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
+ }
+ }
+ }
+ }
+
/** Get a random blockId that is not used already */
private long getFreeBlockId() {
long id = rand.nextLong();
@@ -216,6 +261,12 @@ public class TestDirectoryScanner {
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long
missingBlockFile,
long missingMemoryBlocks, long mismatchBlocks) throws IOException {
+ scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
+ missingMemoryBlocks, mismatchBlocks, 0);
+ }
+
+ private void scan(long totalBlocks, int diffsize, long missingMetaFile,
long missingBlockFile,
+ long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks)
throws IOException {
scanner.reconcile();
assertTrue(scanner.diffs.containsKey(bpid));
@@ -229,9 +280,92 @@ public class TestDirectoryScanner {
assertEquals(missingBlockFile, stats.missingBlockFile);
assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
assertEquals(mismatchBlocks, stats.mismatchBlocks);
+ assertEquals(duplicateBlocks, stats.duplicateBlocks);
+ }
+
+ @Test (timeout=300000)
+ public void testRetainBlockOnPersistentStorage() throws Exception {
+ cluster = new MiniDFSCluster
+ .Builder(CONF)
+ .storageTypes(new StorageType[] { StorageType.RAM_DISK,
StorageType.DEFAULT })
+ .numDataNodes(1)
+ .build();
+ try {
+ cluster.waitActive();
+ bpid = cluster.getNamesystem().getBlockPoolId();
+ fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+ client = cluster.getFileSystem().getClient();
+ scanner = new DirectoryScanner(fds, CONF);
+ scanner.setRetainDiffs(true);
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ // Add a file with 1 block
+ List<LocatedBlock> blocks =
+ createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
+
+ // Ensure no difference between volumeMap and disk.
+ scan(1, 0, 0, 0, 0, 0);
+
+ // Make a copy of the block on RAM_DISK and ensure that it is
+ // picked up by the scanner.
+ duplicateBlock(blocks.get(0).getBlock().getBlockId());
+ scan(2, 1, 0, 0, 0, 0, 1);
+ verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+ scan(1, 0, 0, 0, 0, 0);
+
+ } finally {
+ if (scanner != null) {
+ scanner.shutdown();
+ scanner = null;
+ }
+ cluster.shutdown();
+ cluster = null;
+ }
}
- @Test
+ @Test (timeout=300000)
+ public void testDeleteBlockOnTransientStorage() throws Exception {
+ cluster = new MiniDFSCluster
+ .Builder(CONF)
+ .storageTypes(new StorageType[] { StorageType.RAM_DISK,
StorageType.DEFAULT })
+ .numDataNodes(1)
+ .build();
+ try {
+ cluster.waitActive();
+ bpid = cluster.getNamesystem().getBlockPoolId();
+ fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+ client = cluster.getFileSystem().getClient();
+ scanner = new DirectoryScanner(fds, CONF);
+ scanner.setRetainDiffs(true);
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+ // Create a file file on RAM_DISK
+ List<LocatedBlock> blocks =
+ createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
+
+ // Ensure no difference between volumeMap and disk.
+ scan(1, 0, 0, 0, 0, 0);
+
+ // Make a copy of the block on DEFAULT storage and ensure that it is
+ // picked up by the scanner.
+ duplicateBlock(blocks.get(0).getBlock().getBlockId());
+ scan(2, 1, 0, 0, 0, 0, 1);
+
+ // Ensure that the copy on RAM_DISK was deleted.
+ verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+ scan(1, 0, 0, 0, 0, 0);
+
+ } finally {
+ if (scanner != null) {
+ scanner.shutdown();
+ scanner = null;
+ }
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test (timeout=600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
for (int parallelism = 1; parallelism < 3; parallelism++) {
@@ -245,16 +379,17 @@ public class TestDirectoryScanner {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+ client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
// Add files with 100 blocks
- createFile("/tmp/t1", 10000);
+ createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false);
long totalBlocks = 100;
- // Test1: No difference between in-memory and disk
+ // Test1: No difference between volumeMap and disk
scan(100, 0, 0, 0, 0, 0);
// Test2: block metafile is missing
@@ -355,7 +490,10 @@ public class TestDirectoryScanner {
assertFalse(scanner.getRunStatus());
} finally {
- scanner.shutdown();
+ if (scanner != null) {
+ scanner.shutdown();
+ scanner = null;
+ }
cluster.shutdown();
}
}
@@ -389,6 +527,13 @@ public class TestDirectoryScanner {
assertEquals(genStamp, memBlock.getGenerationStamp());
}
+ private void verifyStorageType(long blockId, boolean expectTransient) {
+ final ReplicaInfo memBlock;
+ memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+ assertNotNull(memBlock);
+ assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient));
+ }
+
private static class TestFsVolumeSpi implements FsVolumeSpi {
@Override
public String[] getBlockPoolList() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index d6d7dd7..48ddcc2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -62,4 +63,13 @@ public class FsDatasetTestUtil {
String bpid) {
return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
}
+
+ /**
+ * Stop the lazy writer daemon that saves RAM disk files to persistent
storage.
+ * @param dn
+ */
+ public static void stopLazyWriter(DataNode dn) {
+ FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
+ ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/14edbc94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index fcc4798..777779f 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -368,6 +368,8 @@ public class TestLazyPersistFiles {
// Found a persisted copy for this block!
boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
assertThat(added, is(true));
+ } else {
+ LOG.error(blockFile + " not found");
}
}
}
@@ -423,7 +425,7 @@ public class TestLazyPersistFiles {
final int SEED = 0XFADED;
// Stop lazy writer to ensure block for path1 is not persisted to disk.
- stopLazyWriter(cluster.getDataNodes().get(0));
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
@@ -488,7 +490,7 @@ public class TestLazyPersistFiles {
throws IOException, InterruptedException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
- stopLazyWriter(cluster.getDataNodes().get(0));
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
@@ -682,7 +684,7 @@ public class TestLazyPersistFiles {
throws IOException, InterruptedException {
startUpCluster(true, 1);
- stopLazyWriter(cluster.getDataNodes().get(0));
+ FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -794,12 +796,6 @@ public class TestLazyPersistFiles {
return locatedBlocks;
}
- private void stopLazyWriter(DataNode dn) {
- // Stop the lazyWriter daemon.
- FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
- ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
- }
-
private void makeRandomTestFile(Path path, long length, final boolean
isLazyPersist,
long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,