http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index a3ac455..38fc637 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -633,15 +633,16 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol - public LastBlockWithStatus append(String src, String clientName) - throws IOException { + public LastBlockWithStatus append(String src, String clientName, + EnumSetWritable<CreateFlag> flag) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.append: file " +src+" for "+clientName+" at "+clientMachine); } - CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (LastBlockWithStatus) cacheEntry.getPayload(); } @@ -649,7 +650,7 @@ class NameNodeRpcServer implements NamenodeProtocols { LastBlockWithStatus info = null; boolean success = false; try { - info = namesystem.appendFile(src, clientName, clientMachine, + info = namesystem.appendFile(src, clientName, clientMachine, flag.get(), cacheEntry != null); success = true; } finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 5c9f752..34564d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -66,6 +66,7 @@ enum CreateFlagProto { OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC APPEND = 0x04; // Append to a file LAZY_PERSIST = 0x10; // File with reduced durability guarantees. + NEW_BLOCK = 0x20; // Write data to a new block when appending } message CreateRequestProto { @@ -86,6 +87,7 @@ message CreateResponseProto { message AppendRequestProto { required string src = 1; required string clientName = 2; + optional uint32 flag = 3; // bits set using CreateFlag } message AppendResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto index e50f14b..5b78fe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto @@ -89,6 +89,7 @@ message CloseEventProto { message AppendEventProto { required string path = 1; + optional bool newBlock = 2 [default = false]; } message RenameEventProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java index eab44be..68a85b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java @@ -136,6 +136,22 @@ public class AppendTestUtil { } } + public static void check(DistributedFileSystem fs, Path p, int position, + int length) throws IOException { + byte[] buf = new byte[length]; + int i = 0; + try { + FSDataInputStream in = fs.open(p); + in.read(position, buf, 0, buf.length); + for(i = position; i < length + position; i++) { + assertEquals((byte) i, buf[i - position]); + } + in.close(); + } catch(IOException ioe) { + throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe); + } + } + /** * create a buffer that contains the entire test file data. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 0eef46f..126827a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1132,6 +1132,9 @@ public class DFSTestUtil { FSDataOutputStream s = filesystem.create(pathFileCreate); // OP_CLOSE 9 s.close(); + // OP_APPEND 47 + FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null); + s2.close(); // OP_SET_STORAGE_POLICY 45 filesystem.setStoragePolicy(pathFileCreate, HdfsConstants.HOT_STORAGE_POLICY_NAME); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java index 75a4ad4..4f449d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.XAttrSetFlag; @@ -71,7 +72,7 @@ public class TestDFSInotifyEventInputStream { */ @Test public void testOpcodeCount() { - Assert.assertEquals(48, FSEditLogOpCodes.values().length); + Assert.assertEquals(49, FSEditLogOpCodes.values().length); } @@ -109,7 +110,8 @@ public class TestDFSInotifyEventInputStream { os.write(new byte[BLOCK_SIZE]); os.close(); // CloseOp -> CloseEvent // AddOp -> AppendEvent - os = client.append("/file2", BLOCK_SIZE, null, null); + os = client.append("/file2", BLOCK_SIZE, EnumSet.of(CreateFlag.APPEND), + null, null); os.write(new byte[BLOCK_SIZE]); os.close(); // CloseOp -> CloseEvent Thread.sleep(10); // so that the atime will get updated on the next line @@ -182,13 +184,14 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce2.getFileSize() > 0); Assert.assertTrue(ce2.getTimestamp() > 0); - // AddOp + // AppendOp batch = waitForNextEvents(eis); Assert.assertEquals(1, batch.getEvents().length); txid = checkTxid(batch, txid); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND); Event.AppendEvent append2 = (Event.AppendEvent)batch.getEvents()[0]; Assert.assertEquals("/file2", append2.getPath()); + Assert.assertFalse(append2.toNewBlock()); // CloseOp batch = waitForNextEvents(eis); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 34c701d..3cb72ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -25,10 +25,12 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.EnumSet; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HardLink; @@ -344,7 +346,46 @@ public class TestFileAppend{ cluster.shutdown(); } } + + /** Test two consecutive appends on a file with a full block. */ + @Test + public void testAppend2Twice() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + final DistributedFileSystem fs1 = cluster.getFileSystem(); + final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); + try { + final Path p = new Path("/testAppendTwice/foo"); + final int len = 1 << 16; + final byte[] fileContents = AppendTestUtil.initBuffer(len); + + { + // create a new file with a full block. + FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); + out.write(fileContents, 0, len); + out.close(); + } + //1st append does not add any data so that the last block remains full + //and the last block in INodeFileUnderConstruction is a BlockInfo + //but not BlockInfoUnderConstruction. + ((DistributedFileSystem) fs2).append(p, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + + // 2nd append should get AlreadyBeingCreatedException + fs1.append(p); + Assert.fail(); + } catch(RemoteException re) { + AppendTestUtil.LOG.info("Got an exception:", re); + Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), + re.getClassName()); + } finally { + fs2.close(); + fs1.close(); + cluster.shutdown(); + } + } + /** Tests appending after soft-limit expires. */ @Test public void testAppendAfterSoftLimit() @@ -386,6 +427,54 @@ public class TestFileAppend{ } } + /** Tests appending after soft-limit expires. */ + @Test + public void testAppend2AfterSoftLimit() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + //Set small soft-limit for lease + final long softLimit = 1L; + final long hardLimit = 9999999L; + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .build(); + cluster.setLeasePeriod(softLimit, hardLimit); + cluster.waitActive(); + + DistributedFileSystem fs = cluster.getFileSystem(); + DistributedFileSystem fs2 = new DistributedFileSystem(); + fs2.initialize(fs.getUri(), conf); + + final Path testPath = new Path("/testAppendAfterSoftLimit"); + final byte[] fileContents = AppendTestUtil.initBuffer(32); + + // create a new file without closing + FSDataOutputStream out = fs.create(testPath); + out.write(fileContents); + + //Wait for > soft-limit + Thread.sleep(250); + + try { + FSDataOutputStream appendStream2 = fs2.append(testPath, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + appendStream2.write(fileContents); + appendStream2.close(); + assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen()); + // make sure we now have 1 block since the first writer was revoked + LocatedBlocks blks = fs.getClient().getLocatedBlocks(testPath.toString(), + 0L); + assertEquals(1, blks.getLocatedBlocks().size()); + for (LocatedBlock blk : blks.getLocatedBlocks()) { + assertEquals(fileContents.length, blk.getBlockSize()); + } + } finally { + fs.close(); + fs2.close(); + cluster.shutdown(); + } + } + /** * Old replica of the block should not be accepted as valid for append/read */ @@ -439,4 +528,77 @@ public class TestFileAppend{ } } + /** + * Old replica of the block should not be accepted as valid for append/read + */ + @Test + public void testMultiAppend2() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", + "false"); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .build(); + DistributedFileSystem fs = null; + final String hello = "hello\n"; + try { + fs = cluster.getFileSystem(); + Path path = new Path("/test"); + FSDataOutputStream out = fs.create(path); + out.writeBytes(hello); + out.close(); + + // stop one datanode + DataNodeProperties dnProp = cluster.stopDataNode(0); + String dnAddress = dnProp.datanode.getXferAddress().toString(); + if (dnAddress.startsWith("/")) { + dnAddress = dnAddress.substring(1); + } + + // append again to bump genstamps + for (int i = 0; i < 2; i++) { + out = fs.append(path, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + out.writeBytes(hello); + out.close(); + } + + // re-open and make the block state as underconstruction + out = fs.append(path, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), + 4096, null); + cluster.restartDataNode(dnProp, true); + // wait till the block report comes + Thread.sleep(2000); + out.writeBytes(hello); + out.close(); + // check the block locations + LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L); + // since we append the file 3 time, we should be 4 blocks + assertEquals(4, blocks.getLocatedBlocks().size()); + for (LocatedBlock block : blocks.getLocatedBlocks()) { + assertEquals(hello.length(), block.getBlockSize()); + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 4; i++) { + sb.append(hello); + } + final byte[] content = sb.toString().getBytes(); + AppendTestUtil.checkFullFile(fs, path, content.length, content, + "Read /test"); + + // restart namenode to make sure the editlog can be properly applied + cluster.restartNameNode(true); + cluster.waitActive(); + AppendTestUtil.checkFullFile(fs, path, content.length, content, + "Read /test"); + blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L); + // since we append the file 3 time, we should be 4 blocks + assertEquals(4, blocks.getLocatedBlocks().size()); + for (LocatedBlock block : blocks.getLocatedBlocks()) { + assertEquals(hello.length(), block.getBlockSize()); + } + } finally { + IOUtils.closeStream(fs); + cluster.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java index eecd23b..99d04dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -24,14 +25,18 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -67,11 +72,7 @@ public class TestFileAppend2 { final int numberOfFiles = 50; final int numThreads = 10; final int numAppendsPerThread = 20; -/*** - int numberOfFiles = 1; - int numThreads = 1; - int numAppendsPerThread = 2000; -****/ + Workload[] workload = null; final ArrayList<Path> testFiles = new ArrayList<Path>(); volatile static boolean globalStatus = true; @@ -229,16 +230,170 @@ public class TestFileAppend2 { } } + /** + * Creates one file, writes a few bytes to it and then closed it. + * Reopens the same file for appending using append2 API, write all blocks and + * then close. Verify that all data exists in file. + */ + @Test + public void testSimpleAppend2() throws Exception { + final Configuration conf = new HdfsConfiguration(); + if (simulatedStorage) { + SimulatedFSDataset.setFactory(conf); + } + conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50); + fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + try { + { // test appending to a file. + // create a new file. + Path file1 = new Path("/simpleAppend.dat"); + FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1); + System.out.println("Created file simpleAppend.dat"); + + // write to file + int mid = 186; // io.bytes.per.checksum bytes + System.out.println("Writing " + mid + " bytes to file " + file1); + stm.write(fileContents, 0, mid); + stm.close(); + System.out.println("Wrote and Closed first part of file."); + + // write to file + int mid2 = 607; // io.bytes.per.checksum bytes + System.out.println("Writing " + mid + " bytes to file " + file1); + stm = fs.append(file1, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + stm.write(fileContents, mid, mid2-mid); + stm.close(); + System.out.println("Wrote and Closed second part of file."); + + // write the remainder of the file + stm = fs.append(file1, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + // ensure getPos is set to reflect existing size of the file + assertTrue(stm.getPos() > 0); + System.out.println("Writing " + (AppendTestUtil.FILE_SIZE - mid2) + + " bytes to file " + file1); + stm.write(fileContents, mid2, AppendTestUtil.FILE_SIZE - mid2); + System.out.println("Written second part of file"); + stm.close(); + System.out.println("Wrote and Closed second part of file."); + + // verify that entire file is good + AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE, + fileContents, "Read 2"); + // also make sure there three different blocks for the file + List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks( + file1.toString(), 0L).getLocatedBlocks(); + assertEquals(12, blocks.size()); // the block size is 1024 + assertEquals(mid, blocks.get(0).getBlockSize()); + assertEquals(mid2 - mid, blocks.get(1).getBlockSize()); + for (int i = 2; i < 11; i++) { + assertEquals(AppendTestUtil.BLOCK_SIZE, blocks.get(i).getBlockSize()); + } + assertEquals((AppendTestUtil.FILE_SIZE - mid2) + % AppendTestUtil.BLOCK_SIZE, blocks.get(11).getBlockSize()); + } + + { // test appending to an non-existing file. + FSDataOutputStream out = null; + try { + out = fs.append(new Path("/non-existing.dat"), + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + fail("Expected to have FileNotFoundException"); + } catch(java.io.FileNotFoundException fnfe) { + System.out.println("Good: got " + fnfe); + fnfe.printStackTrace(System.out); + } finally { + IOUtils.closeStream(out); + } + } + + { // test append permission. + // set root to all writable + Path root = new Path("/"); + fs.setPermission(root, new FsPermission((short)0777)); + fs.close(); + + // login as a different user + final UserGroupInformation superuser = + UserGroupInformation.getCurrentUser(); + String username = "testappenduser"; + String group = "testappendgroup"; + assertFalse(superuser.getShortUserName().equals(username)); + assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group)); + UserGroupInformation appenduser = UserGroupInformation + .createUserForTesting(username, new String[] { group }); + + fs = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(appenduser, + conf); + + // create a file + Path dir = new Path(root, getClass().getSimpleName()); + Path foo = new Path(dir, "foo.dat"); + FSDataOutputStream out = null; + int offset = 0; + try { + out = fs.create(foo); + int len = 10 + AppendTestUtil.nextInt(100); + out.write(fileContents, offset, len); + offset += len; + } finally { + IOUtils.closeStream(out); + } + + // change dir and foo to minimal permissions. + fs.setPermission(dir, new FsPermission((short)0100)); + fs.setPermission(foo, new FsPermission((short)0200)); + + // try append, should success + out = null; + try { + out = fs.append(foo, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + int len = 10 + AppendTestUtil.nextInt(100); + out.write(fileContents, offset, len); + offset += len; + } finally { + IOUtils.closeStream(out); + } + + // change dir and foo to all but no write on foo. + fs.setPermission(foo, new FsPermission((short)0577)); + fs.setPermission(dir, new FsPermission((short)0777)); + + // try append, should fail + out = null; + try { + out = fs.append(foo, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + fail("Expected to have AccessControlException"); + } catch(AccessControlException ace) { + System.out.println("Good: got " + ace); + ace.printStackTrace(System.out); + } finally { + IOUtils.closeStream(out); + } + } + } finally { + fs.close(); + cluster.shutdown(); + } + } + // // an object that does a bunch of appends to files // class Workload extends Thread { private final int id; private final MiniDFSCluster cluster; + private final boolean appendToNewBlock; - Workload(MiniDFSCluster cluster, int threadIndex) { + Workload(MiniDFSCluster cluster, int threadIndex, boolean append2) { id = threadIndex; this.cluster = cluster; + this.appendToNewBlock = append2; } // create a bunch of files. Write to them and then verify. @@ -261,7 +416,7 @@ public class TestFileAppend2 { long len = 0; int sizeToAppend = 0; try { - FileSystem fs = cluster.getFileSystem(); + DistributedFileSystem fs = cluster.getFileSystem(); // add a random number of bytes to file len = fs.getFileStatus(testfile).getLen(); @@ -285,7 +440,9 @@ public class TestFileAppend2 { " appending " + sizeToAppend + " bytes " + " to file " + testfile + " of size " + len); - FSDataOutputStream stm = fs.append(testfile); + FSDataOutputStream stm = appendToNewBlock ? fs.append(testfile, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) + : fs.append(testfile); stm.write(fileContents, (int)len, sizeToAppend); stm.close(); @@ -298,7 +455,7 @@ public class TestFileAppend2 { " expected size " + (len + sizeToAppend) + " waiting for namenode metadata update."); Thread.sleep(5000); - } catch (InterruptedException e) {;} + } catch (InterruptedException e) {} } assertTrue("File " + testfile + " size is " + @@ -306,7 +463,7 @@ public class TestFileAppend2 { " but expected " + (len + sizeToAppend), fs.getFileStatus(testfile).getLen() == (len + sizeToAppend)); - AppendTestUtil.checkFullFile(fs, testfile, (int)(len + sizeToAppend), + AppendTestUtil.checkFullFile(fs, testfile, (int) (len + sizeToAppend), fileContents, "Read 2"); } catch (Throwable e) { globalStatus = false; @@ -331,10 +488,8 @@ public class TestFileAppend2 { /** * Test that appends to files at random offsets. - * @throws IOException an exception might be thrown */ - @Test - public void testComplexAppend() throws IOException { + private void testComplexAppend(boolean appendToNewBlock) throws IOException { fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); @@ -366,7 +521,7 @@ public class TestFileAppend2 { // Create threads and make them run workload concurrently. workload = new Workload[numThreads]; for (int i = 0; i < numThreads; i++) { - workload[i] = new Workload(cluster, i); + workload[i] = new Workload(cluster, i, appendToNewBlock); workload[i].start(); } @@ -390,4 +545,14 @@ public class TestFileAppend2 { // assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus); } + + @Test + public void testComplexAppend() throws IOException { + testComplexAppend(false); + } + + @Test + public void testComplexAppend2() throws IOException { + testComplexAppend(true); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java index d5de0ff..9ebe115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java @@ -24,7 +24,10 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.util.EnumSet; +import java.util.List; +import org.apache.hadoop.fs.CreateFlag; import org.mockito.invocation.InvocationOnMock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -36,8 +39,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSClientAdapter; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.log4j.Level; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -121,6 +123,32 @@ public class TestFileAppend3 { AppendTestUtil.check(fs, p, len1 + len2); } + @Test + public void testTC1ForAppend2() throws Exception { + final Path p = new Path("/TC1/foo2"); + + //a. Create file and write one block of data. Close file. + final int len1 = (int) BLOCK_SIZE; + { + FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, + BLOCK_SIZE); + AppendTestUtil.write(out, 0, len1); + out.close(); + } + + // Reopen file to append. Append half block of data. Close file. + final int len2 = (int) BLOCK_SIZE / 2; + { + FSDataOutputStream out = fs.append(p, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + AppendTestUtil.write(out, len1, len2); + out.close(); + } + + // b. Reopen file and read 1.5 blocks worth of data. Close file. + AppendTestUtil.check(fs, p, len1 + len2); + } + /** * TC2: Append on non-block boundary. * @throws IOException an exception might be thrown @@ -152,6 +180,40 @@ public class TestFileAppend3 { AppendTestUtil.check(fs, p, len1 + len2); } + @Test + public void testTC2ForAppend2() throws Exception { + final Path p = new Path("/TC2/foo2"); + + //a. Create file with one and a half block of data. Close file. + final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2); + { + FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, + BLOCK_SIZE); + AppendTestUtil.write(out, 0, len1); + out.close(); + } + + AppendTestUtil.check(fs, p, len1); + + // Reopen file to append quarter block of data. Close file. + final int len2 = (int) BLOCK_SIZE / 4; + { + FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), + 4096, null); + AppendTestUtil.write(out, len1, len2); + out.close(); + } + + // b. Reopen file and read 1.75 blocks of data. Close file. + AppendTestUtil.check(fs, p, len1 + len2); + List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks( + p.toString(), 0L).getLocatedBlocks(); + Assert.assertEquals(3, blocks.size()); + Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize()); + Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize()); + Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize()); + } + /** * TC5: Only one simultaneous append. * @throws IOException an exception might be thrown @@ -179,18 +241,63 @@ public class TestFileAppend3 { AppendTestUtil.LOG.info("GOOD: got an exception", ioe); } + try { + ((DistributedFileSystem) AppendTestUtil + .createHdfsWithDifferentUsername(conf)).append(p, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + fail("This should fail."); + } catch(IOException ioe) { + AppendTestUtil.LOG.info("GOOD: got an exception", ioe); + } + //d. On Machine M1, close file. out.close(); } + @Test + public void testTC5ForAppend2() throws Exception { + final Path p = new Path("/TC5/foo2"); + + // a. Create file on Machine M1. Write half block to it. Close file. + { + FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, + BLOCK_SIZE); + AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2)); + out.close(); + } + + // b. Reopen file in "append" mode on Machine M1. + FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), + 4096, null); + + // c. On Machine M2, reopen file in "append" mode. This should fail. + try { + ((DistributedFileSystem) AppendTestUtil + .createHdfsWithDifferentUsername(conf)).append(p, + EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); + fail("This should fail."); + } catch(IOException ioe) { + AppendTestUtil.LOG.info("GOOD: got an exception", ioe); + } + + try { + AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p); + fail("This should fail."); + } catch(IOException ioe) { + AppendTestUtil.LOG.info("GOOD: got an exception", ioe); + } + + // d. On Machine M1, close file. + out.close(); + } + /** * TC7: Corrupted replicas are present. * @throws IOException an exception might be thrown */ - @Test - public void testTC7() throws Exception { + private void testTC7(boolean appendToNewBlock) throws Exception { final short repl = 2; - final Path p = new Path("/TC7/foo"); + final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1")); System.out.println("p=" + p); //a. Create file with replication factor of 2. Write half block of data. Close file. @@ -224,7 +331,8 @@ public class TestFileAppend3 { //c. Open file in "append mode". Append a new block worth of data. Close file. final int len2 = (int)BLOCK_SIZE; { - FSDataOutputStream out = fs.append(p); + FSDataOutputStream out = appendToNewBlock ? + fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p); AppendTestUtil.write(out, len1, len2); out.close(); } @@ -233,13 +341,21 @@ public class TestFileAppend3 { AppendTestUtil.check(fs, p, len1 + len2); } + @Test + public void testTC7() throws Exception { + testTC7(false); + } + + @Test + public void testTC7ForAppend2() throws Exception { + testTC7(true); + } + /** * TC11: Racing rename - * @throws IOException an exception might be thrown */ - @Test - public void testTC11() throws Exception { - final Path p = new Path("/TC11/foo"); + private void testTC11(boolean appendToNewBlock) throws Exception { + final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1")); System.out.println("p=" + p); //a. Create file and write one block of data. Close file. @@ -251,7 +367,9 @@ public class TestFileAppend3 { } //b. Reopen file in "append" mode. Append half block of data. - FSDataOutputStream out = fs.append(p); + FSDataOutputStream out = appendToNewBlock ? + fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : + fs.append(p); final int len2 = (int)BLOCK_SIZE/2; AppendTestUtil.write(out, len1, len2); out.hflush(); @@ -283,13 +401,21 @@ public class TestFileAppend3 { } } + @Test + public void testTC11() throws Exception { + testTC11(false); + } + + @Test + public void testTC11ForAppend2() throws Exception { + testTC11(true); + } + /** * TC12: Append to partial CRC chunk - * @throws IOException an exception might be thrown */ - @Test - public void testTC12() throws Exception { - final Path p = new Path("/TC12/foo"); + private void testTC12(boolean appendToNewBlock) throws Exception { + final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1")); System.out.println("p=" + p); //a. Create file with a block size of 64KB @@ -305,23 +431,43 @@ public class TestFileAppend3 { //b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file. final int len2 = 5877; { - FSDataOutputStream out = fs.append(p); + FSDataOutputStream out = appendToNewBlock ? + fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : + fs.append(p); AppendTestUtil.write(out, len1, len2); out.close(); } //c. Reopen file and read 25687+5877 bytes of data from file. Close file. AppendTestUtil.check(fs, p, len1 + len2); + if (appendToNewBlock) { + LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0); + Assert.assertEquals(2, blks.getLocatedBlocks().size()); + Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize()); + Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize()); + AppendTestUtil.check(fs, p, 0, len1); + AppendTestUtil.check(fs, p, len1, len2); + } } - - /** Append to a partial CRC chunk and - * the first write does not fill up the partial CRC trunk - * * - * @throws IOException - */ + @Test - public void testAppendToPartialChunk() throws IOException { - final Path p = new Path("/partialChunk/foo"); + public void testTC12() throws Exception { + testTC12(false); + } + + @Test + public void testTC12ForAppend2() throws Exception { + testTC12(true); + } + + /** + * Append to a partial CRC chunk and the first write does not fill up the + * partial CRC trunk + */ + private void testAppendToPartialChunk(boolean appendToNewBlock) + throws IOException { + final Path p = new Path("/partialChunk/foo" + + (appendToNewBlock ? "0" : "1")); final int fileLen = 513; System.out.println("p=" + p); @@ -336,7 +482,9 @@ public class TestFileAppend3 { System.out.println("Wrote 1 byte and closed the file " + p); // append to file - stm = fs.append(p); + stm = appendToNewBlock ? + fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : + fs.append(p); // Append to a partial CRC trunk stm.write(fileContents, 1, 1); stm.hflush(); @@ -345,7 +493,9 @@ public class TestFileAppend3 { System.out.println("Append 1 byte and closed the file " + p); // write the remainder of the file - stm = fs.append(p); + stm = appendToNewBlock ? + fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : + fs.append(p); // ensure getPos is set to reflect existing size of the file assertEquals(2, stm.getPos()); @@ -444,4 +594,14 @@ public class TestFileAppend3 { // if append was called with a stale file stat. doSmallAppends(file, fs, 20); } + + @Test + public void testAppendToPartialChunk() throws IOException { + testAppendToPartialChunk(false); + } + + @Test + public void testAppendToPartialChunkforAppend2() throws IOException { + testAppendToPartialChunk(true); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java index 0bca23d..a2b344c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java @@ -99,10 +99,11 @@ public class TestFileAppendRestart { // OP_ADD to create file // OP_ADD_BLOCK for first block // OP_CLOSE to close file - // OP_ADD to reopen file + // OP_APPEND to reopen file // OP_ADD_BLOCK for second block // OP_CLOSE to close file - assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held); + assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_ADD).held); + assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held); assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held); assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held); @@ -112,13 +113,14 @@ public class TestFileAppendRestart { // OP_ADD to create file // OP_ADD_BLOCK for first block // OP_CLOSE to close file - // OP_ADD to re-establish the lease + // OP_APPEND to re-establish the lease // OP_UPDATE_BLOCKS from the updatePipeline call (increments genstamp of last block) // OP_ADD_BLOCK at the start of the second block // OP_CLOSE to close file // Total: 2 OP_ADDs, 1 OP_UPDATE_BLOCKS, 2 OP_ADD_BLOCKs, and 2 OP_CLOSEs // in addition to the ones above - assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held); + assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held); + assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held); assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_UPDATE_BLOCKS).held); assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held); assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java index 9ada95f..6bcfa71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java @@ -31,7 +31,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; import org.junit.Test; @@ -121,7 +123,66 @@ public class TestHFlush { cluster.shutdown(); } } - + + /** + * Test hsync with END_BLOCK flag. + */ + @Test + public void hSyncEndBlock_00() throws IOException { + final int preferredBlockSize = 1024; + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); + DistributedFileSystem fileSystem = cluster.getFileSystem(); + FSDataOutputStream stm = null; + try { + Path path = new Path("/" + fName); + stm = fileSystem.create(path, true, 4096, (short) 2, + AppendTestUtil.BLOCK_SIZE); + System.out.println("Created file " + path.toString()); + ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet + .of(SyncFlag.END_BLOCK)); + long currentFileLength = fileSystem.getFileStatus(path).getLen(); + assertEquals(0L, currentFileLength); + LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); + assertEquals(0, blocks.getLocatedBlocks().size()); + + // write a block and call hsync(end_block) at the block boundary + stm.write(new byte[preferredBlockSize]); + ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet + .of(SyncFlag.END_BLOCK)); + currentFileLength = fileSystem.getFileStatus(path).getLen(); + assertEquals(preferredBlockSize, currentFileLength); + blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); + assertEquals(1, blocks.getLocatedBlocks().size()); + + // call hsync then call hsync(end_block) immediately + stm.write(new byte[preferredBlockSize / 2]); + stm.hsync(); + ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet + .of(SyncFlag.END_BLOCK)); + currentFileLength = fileSystem.getFileStatus(path).getLen(); + assertEquals(preferredBlockSize + preferredBlockSize / 2, + currentFileLength); + blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); + assertEquals(2, blocks.getLocatedBlocks().size()); + + stm.write(new byte[preferredBlockSize / 4]); + stm.hsync(); + currentFileLength = fileSystem.getFileStatus(path).getLen(); + assertEquals(preferredBlockSize + preferredBlockSize / 2 + + preferredBlockSize / 4, currentFileLength); + blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); + assertEquals(3, blocks.getLocatedBlocks().size()); + } finally { + IOUtils.cleanup(null, stm, fileSystem); + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * The test calls * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} @@ -136,6 +197,29 @@ public class TestHFlush { /** * The test calls * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} + * while requiring the semantic of {@link SyncFlag#END_BLOCK}. + */ + @Test + public void hSyncEndBlock_01() throws IOException { + doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, + (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK)); + } + + /** + * The test calls + * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} + * while requiring the semantic of {@link SyncFlag#END_BLOCK} and + * {@link SyncFlag#UPDATE_LENGTH}. + */ + @Test + public void hSyncEndBlockAndUpdateLength() throws IOException { + doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, + (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH)); + } + + /** + * The test calls + * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}. * Similar with {@link #hFlush_02()} , it writes a file with a custom block * size so the writes will be happening across block' boundaries @@ -152,7 +236,20 @@ public class TestHFlush { doTheJob(conf, fName, customBlockSize, (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH)); } - + + @Test + public void hSyncEndBlock_02() throws IOException { + Configuration conf = new HdfsConfiguration(); + int customPerChecksumSize = 512; + int customBlockSize = customPerChecksumSize * 3; + // Modify defaul filesystem settings + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); + + doTheJob(conf, fName, customBlockSize, (short) 2, true, + EnumSet.of(SyncFlag.END_BLOCK)); + } + /** * The test calls * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} @@ -173,7 +270,20 @@ public class TestHFlush { doTheJob(conf, fName, customBlockSize, (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH)); } - + + @Test + public void hSyncEndBlock_03() throws IOException { + Configuration conf = new HdfsConfiguration(); + int customPerChecksumSize = 400; + int customBlockSize = customPerChecksumSize * 3; + // Modify defaul filesystem settings + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); + + doTheJob(conf, fName, customBlockSize, (short) 2, true, + EnumSet.of(SyncFlag.END_BLOCK)); + } + /** * The method starts new cluster with defined Configuration; creates a file * with specified block_size and writes 10 equal sections in it; it also calls @@ -197,12 +307,13 @@ public class TestHFlush { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(replicas).build(); // Make sure we work with DFS in order to utilize all its functionality - DistributedFileSystem fileSystem = - cluster.getFileSystem(); + DistributedFileSystem fileSystem = cluster.getFileSystem(); FSDataInputStream is; try { Path path = new Path(fileName); + final String pathName = new Path(fileSystem.getWorkingDirectory(), path) + .toUri().getPath(); FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas, block_size); System.out.println("Created file " + fileName); @@ -210,7 +321,8 @@ public class TestHFlush { int tenth = AppendTestUtil.FILE_SIZE/SECTIONS; int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS; for (int i=0; i<SECTIONS; i++) { - System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName); + System.out.println("Writing " + (tenth * i) + " to " + + (tenth * (i + 1)) + " section to file " + fileName); // write to the file stm.write(fileContent, tenth * i, tenth); @@ -227,7 +339,11 @@ public class TestHFlush { assertEquals( "File size doesn't match for hsync/hflush with updating the length", tenth * (i + 1), currentFileLength); + } else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) { + LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0); + assertEquals(i + 1, blocks.getLocatedBlocks().size()); } + byte [] toRead = new byte[tenth]; byte [] expected = new byte[tenth]; System.arraycopy(fileContent, tenth * i, expected, 0, tenth); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java index b84989f..15580a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java @@ -22,8 +22,10 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Test; @@ -124,7 +127,8 @@ public class TestLeaseRecovery { } DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName); - cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName); + cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName, + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); // expire lease to trigger block recovery. waitLeaseRecovery(cluster); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 84ac2a5..a4df4ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -28,6 +29,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.util.EnumSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -234,7 +236,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { makeTestFile(path, BLOCK_SIZE, true); try { - client.append(path.toString(), BUFFER_LENGTH, null, null).close(); + client.append(path.toString(), BUFFER_LENGTH, + EnumSet.of(CreateFlag.APPEND), null, null).close(); fail("Append to LazyPersist file did not fail as expected"); } catch (Throwable t) { LOG.info("Got expected exception ", t); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java index 6d1f452..ddf5a3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java @@ -40,9 +40,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -99,7 +102,7 @@ public class TestHDFSConcat { HdfsFileStatus fStatus; FSDataInputStream stm; - String trg = new String("/trg"); + String trg = "/trg"; Path trgPath = new Path(trg); DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1); fStatus = nn.getFileInfo(trg); @@ -112,7 +115,7 @@ public class TestHDFSConcat { long [] lens = new long [numFiles]; - int i = 0; + int i; for(i=0; i<files.length; i++) { files[i] = new Path("/file"+i); Path path = files[i]; @@ -385,6 +388,75 @@ public class TestHDFSConcat { } catch (Exception e) { // exspected } - + } + + /** + * make sure we update the quota correctly after concat + */ + @Test + public void testConcatWithQuotaDecrease() throws IOException { + final short srcRepl = 3; // note this is different with REPL_FACTOR + final int srcNum = 10; + final Path foo = new Path("/foo"); + final Path[] srcs = new Path[srcNum]; + final Path target = new Path(foo, "target"); + DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L); + + dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); + + for (int i = 0; i < srcNum; i++) { + srcs[i] = new Path(foo, "src" + i); + DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L); + } + + ContentSummary summary = dfs.getContentSummary(foo); + Assert.assertEquals(11, summary.getFileCount()); + Assert.assertEquals(blockSize * REPL_FACTOR + + blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed()); + + dfs.concat(target, srcs); + summary = dfs.getContentSummary(foo); + Assert.assertEquals(1, summary.getFileCount()); + Assert.assertEquals( + blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum, + summary.getSpaceConsumed()); + } + + @Test + public void testConcatWithQuotaIncrease() throws IOException { + final short repl = 3; + final int srcNum = 10; + final Path foo = new Path("/foo"); + final Path bar = new Path(foo, "bar"); + final Path[] srcs = new Path[srcNum]; + final Path target = new Path(bar, "target"); + DFSTestUtil.createFile(dfs, target, blockSize, repl, 0L); + + final long dsQuota = blockSize * repl + blockSize * srcNum * REPL_FACTOR; + dfs.setQuota(foo, Long.MAX_VALUE - 1, dsQuota); + + for (int i = 0; i < srcNum; i++) { + srcs[i] = new Path(bar, "src" + i); + DFSTestUtil.createFile(dfs, srcs[i], blockSize, REPL_FACTOR, 0L); + } + + ContentSummary summary = dfs.getContentSummary(bar); + Assert.assertEquals(11, summary.getFileCount()); + Assert.assertEquals(dsQuota, summary.getSpaceConsumed()); + + try { + dfs.concat(target, srcs); + fail("QuotaExceededException expected"); + } catch (RemoteException e) { + Assert.assertTrue( + e.unwrapRemoteException() instanceof QuotaExceededException); + } + + dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); + dfs.concat(target, srcs); + summary = dfs.getContentSummary(bar); + Assert.assertEquals(1, summary.getFileCount()); + Assert.assertEquals(blockSize * repl * (srcNum + 1), + summary.getSpaceConsumed()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index 3084f26..2e6b4a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -232,14 +232,18 @@ public class TestNamenodeRetryCache { // Retried append requests succeed newCall(); - LastBlockWithStatus b = nnRpc.append(src, "holder"); - Assert.assertEquals(b, nnRpc.append(src, "holder")); - Assert.assertEquals(b, nnRpc.append(src, "holder")); + LastBlockWithStatus b = nnRpc.append(src, "holder", + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); + Assert.assertEquals(b, nnRpc.append(src, "holder", + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)))); + Assert.assertEquals(b, nnRpc.append(src, "holder", + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)))); // non-retried call fails newCall(); try { - nnRpc.append(src, "holder"); + nnRpc.append(src, "holder", + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); Assert.fail("testAppend - expected exception is not thrown"); } catch (Exception e) { // Expected @@ -409,7 +413,7 @@ public class TestNamenodeRetryCache { LightWeightCache<CacheEntry, CacheEntry> cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet(); - assertEquals(24, cacheSet.size()); + assertEquals(25, cacheSet.size()); Map<CacheEntry, CacheEntry> oldEntries = new HashMap<CacheEntry, CacheEntry>(); @@ -428,7 +432,7 @@ public class TestNamenodeRetryCache { assertTrue(namesystem.hasRetryCache()); cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem .getRetryCache().getCacheSet(); - assertEquals(24, cacheSet.size()); + assertEquals(25, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 066fd66..916893c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -163,7 +163,7 @@ public class TestRetryCacheWithHA { FSNamesystem fsn0 = cluster.getNamesystem(0); LightWeightCache<CacheEntry, CacheEntry> cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet(); - assertEquals(24, cacheSet.size()); + assertEquals(25, cacheSet.size()); Map<CacheEntry, CacheEntry> oldEntries = new HashMap<CacheEntry, CacheEntry>(); @@ -184,7 +184,7 @@ public class TestRetryCacheWithHA { FSNamesystem fsn1 = cluster.getNamesystem(1); cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1 .getRetryCache().getCacheSet(); - assertEquals(24, cacheSet.size()); + assertEquals(25, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); @@ -438,7 +438,8 @@ public class TestRetryCacheWithHA { @Override void invoke() throws Exception { - lbk = client.getNamenode().append(fileName, client.getClientName()); + lbk = client.getNamenode().append(fileName, client.getClientName(), + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); } // check if the inode of the file is under construction @@ -701,7 +702,8 @@ public class TestRetryCacheWithHA { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction - out = this.client.append(file, BlockSize, null, null); + out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND), + null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored index dce3f47..da8c190 100644 Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ