HDFS-6950. Add Additional unit tests for HDFS-6581. (Contributed by Xiaoyu Yao)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/225ffdb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/225ffdb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/225ffdb6
Branch: refs/heads/branch-2
Commit: 225ffdb6d83717529192557f45913517f6a415f4
Parents: 43ea145
Author: arp <[email protected]>
Authored: Wed Sep 3 10:51:26 2014 -0700
Committer: Jitendra Pandey <[email protected]>
Committed: Fri Oct 17 13:42:01 2014 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 45 +-
.../fsdataset/impl/TestLazyPersistFiles.java | 543 +++++++++++++++++--
2 files changed, 537 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/225ffdb6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 8c987c4..820b812 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -94,6 +94,11 @@ import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -305,16 +310,29 @@ public class DFSTestUtil {
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
long fileLen, long blockSize, short
replFactor, long seed)
throws IOException {
- assert bufferLen > 0;
- if (!fs.mkdirs(fileName.getParent())) {
+ createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
+ replFactor, seed, false);
+ }
+
+ public static void createFile(FileSystem fs, Path fileName,
+ boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+ short replFactor, long seed, boolean flush) throws IOException {
+ assert bufferLen > 0;
+ if (!fs.mkdirs(fileName.getParent())) {
throw new IOException("Mkdirs failed to create " +
- fileName.getParent().toString());
- }
- FSDataOutputStream out = null;
- try {
- out = fs.create(fileName, true, fs.getConf()
-
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
- replFactor, blockSize);
+ fileName.getParent().toString());
+ }
+ FSDataOutputStream out = null;
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+ createFlags.add(OVERWRITE);
+ if (isLazyPersist) {
+ createFlags.add(LAZY_PERSIST);
+ }
+ try {
+ out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
+ fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY,
4096),
+ replFactor, blockSize, null);
+
if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed);
@@ -322,10 +340,13 @@ public class DFSTestUtil {
while (bytesToWrite>0) {
rb.nextBytes(toWrite);
int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
- : (int) bytesToWrite;
+ : (int) bytesToWrite;
- out.write(toWrite, 0, bytesToWriteNext);
- bytesToWrite -= bytesToWriteNext;
+ out.write(toWrite, 0, bytesToWriteNext);
+ bytesToWrite -= bytesToWriteNext;
+ }
+ if (flush) {
+ out.hsync();
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/225ffdb6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index cac99a7..461c44d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -16,46 +16,49 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
-import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Test;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
public class TestLazyPersistFiles {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
@@ -66,8 +69,10 @@ public class TestLazyPersistFiles {
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
+ private static final int THREADPOOL_SIZE = 10;
+
private static short REPL_FACTOR = 1;
- private static final long BLOCK_SIZE = 10485760; // 10 MB
+ private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@@ -162,6 +167,26 @@ public class TestLazyPersistFiles {
}
@Test (timeout=300000)
+ public void testPlacementOnSizeLimitedRamDisk() throws IOException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
+ 3 * BLOCK_SIZE -1); // 2 replicas + delta
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ makeTestFile(path1, BLOCK_SIZE, true);
+ makeTestFile(path2, BLOCK_SIZE, true);
+
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ ensureFileReplicasOnStorageType(path2, RAM_DISK);
+ }
+
+ /**
+ * Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
+ * Write should default to disk. No error.
+ * @throws IOException
+ */
+ @Test (timeout=300000)
public void testFallbackToDisk() throws IOException {
startUpCluster(REPL_FACTOR, null, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -172,6 +197,59 @@ public class TestLazyPersistFiles {
}
/**
+ * File can not fit in RamDisk even with eviction
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testFallbackToDiskFull() throws IOException {
+ startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, DEFAULT);
+ }
+
+ /**
+ * File partially fit in RamDisk after eviction.
+ * RamDisk can fit 2 blocks. Write a file with 5 blocks.
+ * Expect 2 blocks are on RamDisk whereas other 3 on disk.
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testFallbackToDiskPartial()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR,
+ new StorageType[] { RAM_DISK, DEFAULT },
+ BLOCK_SIZE * 3 - 1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE * 5, true);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ triggerBlockReport();
+
+ int numBlocksOnRamDisk = 0;
+ int numBlocksOnDisk = 0;
+
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
+ numBlocksOnRamDisk++;
+ }else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
+ numBlocksOnDisk++;
+ }
+ }
+ assertThat(numBlocksOnRamDisk, is(2));
+ assertThat(numBlocksOnDisk, is(3));
+ }
+
+ /**
* If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
* specified, then block placement should fail.
*
@@ -191,6 +269,10 @@ public class TestLazyPersistFiles {
}
}
+ /**
+ * Append to lazy persist file is denied.
+ * @throws IOException
+ */
@Test (timeout=300000)
public void testAppendIsDenied() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
@@ -216,7 +298,7 @@ public class TestLazyPersistFiles {
public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
- new StorageType[] {RAM_DISK, DEFAULT },
+ new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -256,7 +338,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -302,8 +384,13 @@ public class TestLazyPersistFiles {
assertThat(persistedBlockIds.size(),
is(locatedBlocks.getLocatedBlocks().size()));
}
-
- @Test (timeout=300000)
+ /**
+ * RamDisk eviction after lazy persist to disk.
+ * Evicted blocks are still readable with on-disk replicas.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
public void testRamDiskEviction()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
@@ -313,7 +400,8 @@ public class TestLazyPersistFiles {
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- makeTestFile(path1, BLOCK_SIZE, true);
+ final int SEED = 0xFADED;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job.
@@ -323,15 +411,268 @@ public class TestLazyPersistFiles {
// Create another file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true);
- DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
- Thread.sleep(10 * 1000);
+ triggerBlockReport();
// Make sure that the second file's block replica is on RAM_DISK, whereas
// the original file's block replica is now on disk.
- ensureFileReplicasOnStorageType(path2, RAM_DISK);
+// ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
+ /**
+ * RamDisk eviction should not happen on blocks that are not yet
+ * persisted on disk.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testRamDiskEvictionBeforePersist()
+ throws IOException, InterruptedException {
+ // 1 replica + delta, lazy persist interval every 50 minutes
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ (2 * BLOCK_SIZE - 1));
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+ final int SEED = 0XFADED;
+
+ // Stop lazy writer to ensure block for path1 is not persisted to disk.
+ stopLazyWriter(cluster.getDataNodes().get(0));
+
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Create second file with a replica on RAM_DISK.
+ makeTestFile(path2, BLOCK_SIZE, true);
+
+ // Eviction should not happen for block of the first file that is not
+ // persisted yet.
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ ensureFileReplicasOnStorageType(path2, DEFAULT);
+
+ assert(fs.exists(path1));
+ assert(fs.exists(path2));
+ verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+ }
+
+ /**
+ * Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testRamDiskEvictionLRU()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ (4 * BLOCK_SIZE -1)); // 3 replica + delta.
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final int NUM_PATHS = 6;
+ Path paths[] = new Path[NUM_PATHS];
+
+ for (int i = 0; i < NUM_PATHS; i++) {
+ paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
+ }
+
+ // No eviction for the first half of files
+ for (int i = 0; i < NUM_PATHS/2; i++) {
+ makeTestFile(paths[i], BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+ }
+
+ // Lazy persist writer persists the first half of files
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ // Create the second half of files with eviction upon each create.
+ for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
+ makeTestFile(paths[i], BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+
+ // path[i-NUM_PATHS/2] is expected to be evicted by LRU
+ triggerBlockReport();
+ ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
+ }
+ }
+
+ /**
+ * Delete lazy-persist file that has not been persisted to disk.
+ * Memory is freed up and file is gone.
+ * @throws IOException
+ */
+ @Test (timeout=300000)
+ public void testDeleteBeforePersist()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ stopLazyWriter(cluster.getDataNodes().get(0));
+
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, BLOCK_SIZE, true);
+ LocatedBlocks locatedBlocks =
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+ // Delete before persist
+ client.delete(path.toString(), false);
+ Assert.assertFalse(fs.exists(path));
+
+ assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+ }
+
+ /**
+ * Delete lazy-persist file that has been persisted to disk
+ * Both memory blocks and disk blocks are deleted.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testDeleteAfterPersist()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path,
RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ // Delete after persist
+ client.delete(path.toString(), false);
+ Assert.assertFalse(fs.exists(path));
+
+ triggerBlockReport();
+
+ assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+ }
+
+ /**
+ * RAM_DISK used/free space
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testDfsUsageCreateDelete()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ 5 * BLOCK_SIZE - 1); // 4 replica + delta
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ // Get the usage before write BLOCK_SIZE
+ long usedBeforeCreate = fs.getUsed();
+
+ makeTestFile(path, BLOCK_SIZE, true);
+ long usedAfterCreate = fs.getUsed();
+
+ assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ long usedAfterPersist = fs.getUsed();
+ assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
+
+ // Delete after persist
+ client.delete(path.toString(), false);
+ long usedAfterDelete = fs.getUsed();
+
+ assertThat(usedBeforeCreate, is(usedAfterDelete));
+ }
+
+ /**
+ * Concurrent read from the same node and verify the contents.
+ */
+ @Test (timeout=300000)
+ public void testConcurrentRead()
+ throws Exception {
+ startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
+ 3 * BLOCK_SIZE -1); // 2 replicas + delta
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final Path path1 = new Path("/" + METHOD_NAME + ".dat");
+
+ final int SEED = 0xFADED;
+ final int NUM_TASKS = 5;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ //Read from multiple clients
+ final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
+ final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+ Runnable readerRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+ } catch (Throwable e) {
+ LOG.error("readerRunnable error", e);
+ testFailed.set(true);
+ } finally {
+ latch.countDown();
+ }
+ }
+ };
+
+ Thread threads[] = new Thread[NUM_TASKS];
+ for (int i = 0; i < NUM_TASKS; i++) {
+ threads[i] = new Thread(readerRunnable);
+ threads[i].start();
+ }
+
+ Thread.sleep(500);
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ Uninterruptibles.joinUninterruptibly(threads[i]);
+ }
+ Assert.assertFalse(testFailed.get());
+ }
+
+ /**
+ * Concurrent write with eviction
+ * RAM_DISK can hold 9 replicas
+ * 4 threads each write 5 replicas
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testConcurrentWrites()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ (10 * BLOCK_SIZE -1)); // 9 replica + delta.
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final int SEED = 0xFADED;
+ final int NUM_WRITERS = 4;
+ final int NUM_WRITER_PATHS = 5;
+
+ Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ paths[i] = new Path[NUM_WRITER_PATHS];
+ for (int j = 0; j < NUM_WRITER_PATHS; j++) {
+ paths[i][j] =
+ new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
+ }
+ }
+
+ final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
+ final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+ ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
+ testFailed);
+ executor.execute(writer);
+ }
+
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ triggerBlockReport();
+
+ // Stop executor from adding new tasks to finish existing threads in queue
+ latch.await();
+
+ assertThat(testFailed.get(), is(false));
+ }
+
@Test (timeout=300000)
public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException {
@@ -384,11 +725,12 @@ public class TestLazyPersistFiles {
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
- * capped. If tmpfsStorageLimit < 0 then it is ignored.
+ * capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
- final long ramDiskStorageLimit)
+ final long ramDiskStorageLimit,
+ final boolean useSCR)
throws IOException {
conf = new Configuration();
@@ -397,11 +739,13 @@ public class TestLazyPersistFiles {
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
- LAZY_WRITER_INTERVAL_SEC);
+ LAZY_WRITER_INTERVAL_SEC);
+
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
- REPL_FACTOR = 1; //Reset if case a test has modified the value
+ REPL_FACTOR = 1; //Reset in case a test has modified the value
cluster = new MiniDFSCluster
.Builder(conf)
@@ -411,7 +755,7 @@ public class TestLazyPersistFiles {
fs = cluster.getFileSystem();
client = fs.getClient();
- // Artifically cap the storage capacity of the tmpfs volume.
+ // Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
@@ -426,6 +770,13 @@ public class TestLazyPersistFiles {
LOG.info("Cluster startup complete");
}
+ private void startUpCluster(final int numDataNodes,
+ final StorageType[] storageTypes,
+ final long ramDiskStorageLimit)
+ throws IOException {
+ startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
+ }
+
private void makeTestFile(Path path, long length, final boolean
isLazyPersist)
throws IOException {
@@ -435,9 +786,7 @@ public class TestLazyPersistFiles {
createFlags.add(LAZY_PERSIST);
}
-
FSDataOutputStream fos = null;
-
try {
fos =
fs.create(path,
@@ -465,13 +814,14 @@ public class TestLazyPersistFiles {
private LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
+ LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+ assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
-
return locatedBlocks;
}
@@ -480,4 +830,119 @@ public class TestLazyPersistFiles {
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
}
+
+ private void makeRandomTestFile(Path path, long length, final boolean
isLazyPersist,
+ long seed) throws IOException {
+ DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+ BLOCK_SIZE, REPL_FACTOR, seed, true);
+ }
+
+ private boolean verifyReadRandomFile(
+ Path path, int fileLength, int seed) throws IOException {
+ byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+ byte expected[] = DFSTestUtil.
+ calculateFileContentsFromSeed(seed, fileLength);
+ return Arrays.equals(contents, expected);
+ }
+
+ private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+ throws IOException, InterruptedException {
+
+ LOG.info("Verifying replica has no saved copy after deletion.");
+ triggerBlockReport();
+
+ while(
+ DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+ > 0L){
+ Thread.sleep(1000);
+ }
+
+ final String bpid = cluster.getNamesystem().getBlockPoolId();
+ List<? extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ // Make sure deleted replica does not have a copy on either finalized dir
of
+ // transient volume or finalized dir of non-transient volume
+ for (FsVolumeSpi v : volumes) {
+ FsVolumeImpl volume = (FsVolumeImpl) v;
+ File targetDir = (v.isTransientStorage()) ?
+ volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
+ if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks
locatedBlocks) {
+
+ for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+ File targetDir =
+ DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+ File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+ if (blockFile.exists()) {
+ LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+ " exists after deletion.");
+ return false;
+ }
+ File metaFile = new File(targetDir,
+ DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+ lb.getBlock().getGenerationStamp()));
+ if (metaFile.exists()) {
+ LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+ " exists after deletion.");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void triggerBlockReport()
+ throws IOException, InterruptedException {
+ // Trigger block report to NN
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ Thread.sleep(10 * 1000);
+ }
+
+ class WriterRunnable implements Runnable {
+ private final int id;
+ private final MiniDFSCluster cluster;
+ private final Path paths[];
+ private final int seed;
+ private CountDownLatch latch;
+ private AtomicBoolean bFail;
+
+ public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[]
paths,
+ int seed, CountDownLatch latch,
+ AtomicBoolean bFail) {
+ id = threadIndex;
+ this.cluster = cluster;
+ this.paths = paths;
+ this.seed = seed;
+ this.latch = latch;
+ this.bFail = bFail;
+ System.out.println("Creating Writer: " + id);
+ }
+
+ public void run() {
+ System.out.println("Writer " + id + " starting... ");
+ int i = 0;
+ try {
+ for (i = 0; i < paths.length; i++) {
+ makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
+ // eviction may faiL when all blocks are not persisted yet.
+ // ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+ }
+ } catch (IOException e) {
+ bFail.set(true);
+ LOG.error("Writer exception: writer id:" + id +
+ " testfile: " + paths[i].toString() +
+ " " + e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
}