This is an automated email from the ASF dual-hosted git repository. tomscut pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 8396caa4840 HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697) 8396caa4840 is described below commit 8396caa4840338f54115e92f03082e6044840b73 Author: M1eyu2018 <44452470+m1eyu2...@users.noreply.github.com> AuthorDate: Thu Oct 27 19:03:15 2022 +0800 HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697) Reviewed-by: xuzq <15040255...@163.com> Signed-off-by: Tao Li <toms...@apache.org> --- .../main/java/org/apache/hadoop/fs/FileSystem.java | 33 ++++++++ .../org/apache/hadoop/fs/shell/CopyCommands.java | 19 ++++- .../org/apache/hadoop/fs/TestFilterFileSystem.java | 5 ++ .../org/apache/hadoop/fs/TestHarFileSystem.java | 5 ++ .../apache/hadoop/hdfs/DistributedFileSystem.java | 10 +++ .../java/org/apache/hadoop/hdfs/TestDFSShell.java | 91 ++++++++++++++++++++++ 6 files changed, 160 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 7582488e7f9..df853078461 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -1543,6 +1543,39 @@ public abstract class FileSystem extends Configured public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; + /** + * Append to an existing file (optional operation). + * @param f the existing file to be appended. + * @param appendToNewBlock whether to append data to a new block + * instead of the end of the last partial block + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default). + * @return output stream. + */ + public FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException { + return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, + IO_FILE_BUFFER_SIZE_DEFAULT), null, appendToNewBlock); + } + + /** + * Append to an existing file (optional operation). + * This function is used for being overridden by some FileSystem like DistributedFileSystem + * @param f the existing file to be appended. + * @param bufferSize the size of the buffer to be used. + * @param progress for reporting progress if it is not null. + * @param appendToNewBlock whether to append data to a new block + * instead of the end of the last partial block + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default). + * @return output stream. + */ + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress, boolean appendToNewBlock) throws IOException { + return append(f, bufferSize, progress); + } + /** * Concat existing files together. * @param trg the path to the target destination. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 0643a2e983d..1ac204f5f8a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -333,15 +333,24 @@ class CopyCommands { */ public static class AppendToFile extends CommandWithDestination { public static final String NAME = "appendToFile"; - public static final String USAGE = "<localsrc> ... <dst>"; + public static final String USAGE = "[-n] <localsrc> ... <dst>"; public static final String DESCRIPTION = "Appends the contents of all the given local files to the " + "given dst file. The dst file will be created if it does " + "not exist. If <localSrc> is -, then the input is read " + - "from stdin."; + "from stdin. Option -n represents that use NEW_BLOCK create flag to append file."; private static final int DEFAULT_IO_LENGTH = 1024 * 1024; boolean readStdin = false; + private boolean appendToNewBlock = false; + + public boolean isAppendToNewBlock() { + return appendToNewBlock; + } + + public void setAppendToNewBlock(boolean appendToNewBlock) { + this.appendToNewBlock = appendToNewBlock; + } // commands operating on local paths have no need for glob expansion @Override @@ -372,6 +381,9 @@ class CopyCommands { throw new IOException("missing destination argument"); } + CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "n"); + cf.parse(args); + appendToNewBlock = cf.getOpt("n"); getRemoteDestination(args); super.processOptions(args); } @@ -385,7 +397,8 @@ class CopyCommands { } InputStream is = null; - try (FSDataOutputStream fos = dst.fs.append(dst.path)) { + try (FSDataOutputStream fos = appendToNewBlock ? + dst.fs.append(dst.path, true) : dst.fs.append(dst.path)) { if (readStdin) { if (args.size() == 0) { IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index 5ed4d9bc9a7..3d8ea0e826c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -143,6 +143,11 @@ public class TestFilterFileSystem { of the filter such as checksums. */ MultipartUploaderBuilder createMultipartUploader(Path basePath); + + FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException; + + FSDataOutputStream append(Path f, int bufferSize, + Progressable progress, boolean appendToNewBlock) throws IOException; } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 711ab94fdf1..b227e169088 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -250,6 +250,11 @@ public class TestHarFileSystem { MultipartUploaderBuilder createMultipartUploader(Path basePath) throws IOException; + + FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException; + + FSDataOutputStream append(Path f, int bufferSize, + Progressable progress, boolean appendToNewBlock) throws IOException; } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index ff2c5f37c69..9050a4bddee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -422,6 +422,16 @@ public class DistributedFileSystem extends FileSystem return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress); } + @Override + public FSDataOutputStream append(Path f, final int bufferSize, + final Progressable progress, boolean appendToNewBlock) throws IOException { + EnumSet<CreateFlag> flag = EnumSet.of(CreateFlag.APPEND); + if (appendToNewBlock) { + flag.add(CreateFlag.NEW_BLOCK); + } + return append(f, flag, bufferSize, progress); + } + /** * Append to an existing file (optional operation). * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java index a00f21ecc94..e54b7332b1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java @@ -37,6 +37,7 @@ import java.util.zip.GZIPOutputStream; import java.util.function.Supplier; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.XAttrNotFoundException; import org.apache.hadoop.util.Lists; import org.slf4j.Logger; @@ -3043,6 +3044,96 @@ public class TestDFSShell { assertThat(res, not(0)); } + @Test (timeout = 300000) + public void testAppendToFileWithOptionN() throws Exception { + final int inputFileLength = 1024 * 1024; + File testRoot = new File(TEST_ROOT_DIR, "testAppendToFileWithOptionN"); + testRoot.mkdirs(); + + File file1 = new File(testRoot, "file1"); + createLocalFileWithRandomData(inputFileLength, file1); + + Configuration conf = new HdfsConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).build()) { + cluster.waitActive(); + FileSystem hdfs = cluster.getFileSystem(); + assertTrue("Not a HDFS: " + hdfs.getUri(), + hdfs instanceof DistributedFileSystem); + + // Run appendToFile with option n by replica policy once, make sure that the target file is + // created and is of the right size and block number is correct. + String dir = "/replica"; + boolean mkdirs = hdfs.mkdirs(new Path(dir)); + assertTrue("Mkdir fail", mkdirs); + Path remoteFile = new Path(dir + "/remoteFile"); + FsShell shell = new FsShell(); + shell.setConf(conf); + String[] argv = new String[] { + "-appendToFile", "-n", file1.toString(), remoteFile.toString() }; + int res = ToolRunner.run(shell, argv); + assertEquals("Run appendToFile command fail", 0, res); + FileStatus fileStatus = hdfs.getFileStatus(remoteFile); + assertEquals("File size should be " + inputFileLength, + inputFileLength, fileStatus.getLen()); + BlockLocation[] fileBlockLocations = + hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + assertEquals("Block Num should be 1", 1, fileBlockLocations.length); + + // Run appendToFile with option n by replica policy again and + // make sure that the target file size has been doubled and block number has been doubled. + res = ToolRunner.run(shell, argv); + assertEquals("Run appendToFile command fail", 0, res); + fileStatus = hdfs.getFileStatus(remoteFile); + assertEquals("File size should be " + inputFileLength * 2, + inputFileLength * 2, fileStatus.getLen()); + fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + assertEquals("Block Num should be 2", 2, fileBlockLocations.length); + + // Before run appendToFile with option n by ec policy, set ec policy for the dir. + dir = "/ecPolicy"; + final String ecPolicyName = "RS-6-3-1024k"; + mkdirs = hdfs.mkdirs(new Path(dir)); + assertTrue("Mkdir fail", mkdirs); + ((DistributedFileSystem) hdfs).setErasureCodingPolicy(new Path(dir), ecPolicyName); + ErasureCodingPolicy erasureCodingPolicy = + ((DistributedFileSystem) hdfs).getErasureCodingPolicy(new Path(dir)); + assertEquals("Set ec policy fail", ecPolicyName, erasureCodingPolicy.getName()); + + // Run appendToFile with option n by ec policy once, make sure that the target file is + // created and is of the right size and block group number is correct. + remoteFile = new Path(dir + "/remoteFile"); + argv = new String[] { + "-appendToFile", "-n", file1.toString(), remoteFile.toString() }; + res = ToolRunner.run(shell, argv); + assertEquals("Run appendToFile command fail", 0, res); + fileStatus = hdfs.getFileStatus(remoteFile); + assertEquals("File size should be " + inputFileLength, + inputFileLength, fileStatus.getLen()); + fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + assertEquals("Block Group Num should be 1", 1, fileBlockLocations.length); + + // Run appendToFile without option n by ec policy again and make sure that + // append on EC file without new block must fail. + argv = new String[] { + "-appendToFile", file1.toString(), remoteFile.toString() }; + res = ToolRunner.run(shell, argv); + assertTrue("Run appendToFile command must fail", res != 0); + + // Run appendToFile with option n by ec policy again and + // make sure that the target file size has been doubled + // and block group number has been doubled. + argv = new String[] { + "-appendToFile", "-n", file1.toString(), remoteFile.toString() }; + res = ToolRunner.run(shell, argv); + assertEquals("Run appendToFile command fail", 0, res); + fileStatus = hdfs.getFileStatus(remoteFile); + assertEquals("File size should be " + inputFileLength * 2, + inputFileLength * 2, fileStatus.getLen()); + fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + assertEquals("Block Group Num should be 2", 2, fileBlockLocations.length); + } + } + @Test (timeout = 30000) public void testSetXAttrPermission() throws Exception { UserGroupInformation user = UserGroupInformation. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org