Author: atm Date: Tue Jan 7 00:24:09 2014 New Revision: 1556082 URL: http://svn.apache.org/r1556082 Log: HDFS-5685. DistCp will fail to copy with -delete switch. Contributed by Yongjun Zhang.
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1556082&r1=1556081&r2=1556082&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Jan 7 00:24:09 2014 @@ -181,6 +181,9 @@ Release 1.3.0 - unreleased MAPREDUCE-5698. Backport MAPREDUCE-1285 to branch-1 (Yongjun Zhang via Sandy Ryza) + HDFS-5685. DistCp will fail to copy with -delete switch. (Yongjun Zhang + via atm) + Release 1.2.2 - unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=1556082&r1=1556081&r2=1556082&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java Tue Jan 7 00:24:09 2014 @@ -49,6 +49,10 @@ import org.apache.log4j.Level; * A JUnit test for copying files recursively. */ public class TestCopyFiles extends TestCase { + + private static final String JT_STAGING_AREA_ROOT = "mapreduce.jobtracker.staging.root.dir"; + private static final String JT_STAGING_AREA_ROOT_DEFAULT = "/tmp/hadoop/mapred/staging"; + { ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange") ).getLogger().setLevel(Level.OFF); @@ -56,8 +60,9 @@ public class TestCopyFiles extends TestC ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF); ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL); } - - static final URI LOCAL_FS = URI.create("file:///"); + + private static final String LOCAL_FS_STR = "file:///"; + private static final URI LOCAL_FS_URI = URI.create(LOCAL_FS_STR); private static final Random RAN = new Random(); private static final int NFILES = 20; @@ -255,11 +260,11 @@ public class TestCopyFiles extends TestC /** copy files from local file system to local file system */ public void testCopyFromLocalToLocal() throws Exception { Configuration conf = new Configuration(); - FileSystem localfs = FileSystem.get(LOCAL_FS, conf); - MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); + FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf); + MyFile[] files = createFiles(LOCAL_FS_URI, TEST_ROOT_DIR+"/srcdat"); ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); + new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat", + LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"}); assertTrue("Source and destination directories do not match.", checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); deldir(localfs, TEST_ROOT_DIR+"/destdat"); @@ -305,11 +310,11 @@ public class TestCopyFiles extends TestC final FileSystem hdfs = cluster.getFileSystem(); final String namenode = hdfs.getUri().toString(); if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); + MyFile[] files = createFiles(LOCAL_FS_URI, TEST_ROOT_DIR+"/srcdat"); ToolRunner.run(new DistCp(conf), new String[] { "-log", namenode+"/logs", - "file:///"+TEST_ROOT_DIR+"/srcdat", + LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat", namenode+"/destdat"}); assertTrue("Source and destination directories do not match.", checkFiles(cluster.getFileSystem(), "/destdat", files)); @@ -317,7 +322,7 @@ public class TestCopyFiles extends TestC hdfs.exists(new Path(namenode+"/logs"))); deldir(hdfs, "/destdat"); deldir(hdfs, "/logs"); - deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat"); + deldir(FileSystem.get(LOCAL_FS_URI, conf), TEST_ROOT_DIR+"/srcdat"); } } finally { if (cluster != null) { cluster.shutdown(); } @@ -329,7 +334,7 @@ public class TestCopyFiles extends TestC MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); - final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); + final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf); cluster = new MiniDFSCluster(conf, 1, true, null); final FileSystem hdfs = cluster.getFileSystem(); final String namenode = FileSystem.getDefaultUri(conf).toString(); @@ -339,7 +344,7 @@ public class TestCopyFiles extends TestC "-log", "/logs", namenode+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); + LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"}); assertTrue("Source and destination directories do not match.", checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); assertTrue("Log directory does not exist.", @@ -413,7 +418,7 @@ public class TestCopyFiles extends TestC } } - public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception { + public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception { MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); @@ -488,21 +493,119 @@ public class TestCopyFiles extends TestC } } + /** + * A helper function to test copying files between local file system and dfs + * file system, with staging area set to local file system. + */ + private void stagingAreaTest(final FileSystem srcFs, final FileSystem destFs, + MiniDFSCluster cluster, Configuration conf) throws Exception { + try { + final String fileDir = "/files"; + final String srcParent = "/srcdat"; + final String destParent = "/destdata"; + final String source = srcParent + fileDir; + final String destination = destParent + fileDir; + final String logs = "/logs"; + String logDir = TEST_ROOT_DIR + logs; + + URI srcUri = srcFs.getUri(); + URI destUri = destFs.getUri(); + + final boolean isSrcLocalFs = srcUri.getScheme().equals(LOCAL_FS_URI.getScheme()); + + final FileSystem localFs = FileSystem.get(LOCAL_FS_URI, conf); + String prevStagingArea = + conf.get(JT_STAGING_AREA_ROOT, JT_STAGING_AREA_ROOT_DEFAULT); + String newStagingArea = (isSrcLocalFs? source : destination); + newStagingArea += "/STAGING"; + conf.set(JT_STAGING_AREA_ROOT, TEST_ROOT_DIR + newStagingArea); + + final String srcParentPrefix = isSrcLocalFs? TEST_ROOT_DIR : ""; + final String destParentPrefix = isSrcLocalFs? "" : TEST_ROOT_DIR; + + String createDelSrcParent = srcParentPrefix + srcParent; + String createDelDestParent = destParentPrefix + destParent; + String createDelSrc = createDelSrcParent + fileDir; + String createDelDest = createDelDestParent + fileDir; + + MyFile[] srcFiles = createFiles(srcUri, createDelSrc); + createFiles(destUri, createDelDest); + + String distcpSrc = String.valueOf(srcUri) + createDelSrc; + String distcpDest = String.valueOf(destUri) + createDelDest; + + ToolRunner.run(new DistCp(conf), new String[] { + "-log", + LOCAL_FS_STR + logDir, + "-update", + "-delete", + distcpSrc, + distcpDest}); + + assertTrue("Source and destination directories do not match.", + checkFiles(destFs, createDelDest, srcFiles)); + + deldir(localFs, logDir); + deldir(srcFs, createDelSrcParent); + deldir(destFs, createDelDestParent); + + conf.set(JT_STAGING_AREA_ROOT, prevStagingArea); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * test copying files from local file system to dfs file system with staging + * area in src + */ + public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + + String namenode = FileSystem.getDefaultUri(conf).toString(); + assertTrue("Name node doesn't start with hdfs://", namenode.startsWith("hdfs://")); + + final FileSystem srcFs = FileSystem.get(LOCAL_FS_URI, conf); + final FileSystem destFs = cluster.getFileSystem(); + + stagingAreaTest(srcFs, destFs, cluster, conf); + } + + /** + * test copying files from dfs file system to local file system with staging + * area in dest + */ + public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + + String namenode = FileSystem.getDefaultUri(conf).toString(); + assertTrue("Name node doesn't start with hdfs://", namenode.startsWith("hdfs://")); + + final FileSystem srcFs = cluster.getFileSystem(); + final FileSystem destFs = FileSystem.get(LOCAL_FS_URI, conf); + + stagingAreaTest(srcFs, destFs, cluster, conf); + } + public void testCopyDuplication() throws Exception { - final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration()); + final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, new Configuration()); try { MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat"); ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/src2/srcdat"}); + new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat", + LOCAL_FS_STR+TEST_ROOT_DIR+"/src2/srcdat"}); assertTrue("Source and destination directories do not match.", checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files)); assertEquals(DistCp.DuplicationException.ERROR_CODE, ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/src2/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat",})); + new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat", + LOCAL_FS_STR+TEST_ROOT_DIR+"/src2/srcdat", + LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat",})); } finally { deldir(localfs, TEST_ROOT_DIR+"/destdat"); @@ -512,14 +615,14 @@ public class TestCopyFiles extends TestC } public void testCopySingleFile() throws Exception { - FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration()); + FileSystem fs = FileSystem.get(LOCAL_FS_URI, new Configuration()); Path root = new Path(TEST_ROOT_DIR+"/srcdat"); try { MyFile[] files = {createFile(root, fs)}; //copy a dir with a single file ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); + new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat", + LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"}); assertTrue("Source and destination directories do not match.", checkFiles(fs, TEST_ROOT_DIR+"/destdat", files)); @@ -528,8 +631,8 @@ public class TestCopyFiles extends TestC Path p = new Path(root, fname); FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p)); ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname, - "file:///"+TEST_ROOT_DIR+"/dest2/"+fname}); + new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+fname, + LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"+fname}); assertTrue("Source and destination directories do not match.", checkFiles(fs, TEST_ROOT_DIR+"/dest2", files)); //copy single file to existing dir @@ -539,16 +642,16 @@ public class TestCopyFiles extends TestC String sname = files2[0].getName(); ToolRunner.run(new DistCp(new Configuration()), new String[] {"-update", - "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, - "file:///"+TEST_ROOT_DIR+"/dest2/"}); + LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname, + LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"}); assertTrue("Source and destination directories do not match.", checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1); //copy single file to existing dir w/ dst name conflict ToolRunner.run(new DistCp(new Configuration()), new String[] {"-update", - "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, - "file:///"+TEST_ROOT_DIR+"/dest2/"}); + LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname, + LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"}); assertTrue("Source and destination directories do not match.", checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); } @@ -917,7 +1020,7 @@ public class TestCopyFiles extends TestC MiniDFSCluster cluster = null; try { Configuration conf = new Configuration(); - final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); + final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf); cluster = new MiniDFSCluster(conf, 1, true, null); final FileSystem hdfs = cluster.getFileSystem(); final String namenode = FileSystem.getDefaultUri(conf).toString(); @@ -927,7 +1030,7 @@ public class TestCopyFiles extends TestC MyFile[] localFiles = createFiles(localfs, destdir); ToolRunner.run(new DistCp(conf), new String[] { "-delete", "-update", "-log", "/logs", namenode + "/srcdat", - "file:///" + TEST_ROOT_DIR + "/destdat" }); + LOCAL_FS_STR + TEST_ROOT_DIR + "/destdat" }); assertTrue("Source and destination directories do not match.", checkFiles(localfs, destdir, files)); assertTrue("Log directory does not exist.", Modified: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1556082&r1=1556081&r2=1556082&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java (original) +++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java Tue Jan 7 00:24:09 2014 @@ -1091,14 +1091,21 @@ public class DistCp implements Tool { (args.srcs.size() == 1 && !dstExists) || update || overwrite; int srcCount = 0, cnsyncf = 0, dirsyn = 0; long fileCount = 0L, byteCount = 0L, cbsyncs = 0L; + final FileStatus jobDirStat = jobfs.getFileStatus(jobDirectory); try { for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) { final Path src = srcItr.next(); FileSystem srcfs = src.getFileSystem(conf); FileStatus srcfilestat = srcfs.getFileStatus(src); Path root = special && srcfilestat.isDir()? src: src.getParent(); + final boolean needToFilterJobDir = srcfs.equals(jobfs); + if (srcfilestat.isDir()) { - ++srcCount; + if (needToFilterJobDir && (srcfilestat.compareTo(jobDirStat) == 0)) { + continue; + } else { + ++srcCount; + } } Stack<FileStatus> pathstack = new Stack<FileStatus>(); @@ -1109,12 +1116,17 @@ public class DistCp implements Tool { boolean skipfile = false; final FileStatus child = children[i]; final String dst = makeRelative(root, child.getPath()); - ++srcCount; if (child.isDir()) { - pathstack.push(child); + if (needToFilterJobDir && (child.compareTo(jobDirStat) == 0)) { + continue; + } else { + ++srcCount; + pathstack.push(child); + } } else { + ++srcCount; //skip file if the src and the dst files are the same. skipfile = update && sameFile(srcfs, child, dstfs, @@ -1272,17 +1284,22 @@ public class DistCp implements Tool { + ") is not a directory."); } - //write dst lsr results + // write dst lsr results + final boolean needToFilterJobDir = dstfs.equals(jobfs); + final FileStatus jobDirStat = jobfs.getFileStatus(jobdir); final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf, dstlsr, Text.class, dstroot.getClass(), SequenceFile.CompressionType.NONE); try { - //do lsr to get all file statuses in dstroot + // do lsr to get all file statuses in dstroot final Stack<FileStatus> lsrstack = new Stack<FileStatus>(); for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) { final FileStatus status = lsrstack.pop(); if (status.isDir()) { + if (needToFilterJobDir && (status.compareTo(jobDirStat) == 0)) { + continue; + } for(FileStatus child : dstfs.listStatus(status.getPath())) { String relative = makeRelative(dstroot.getPath(), child.getPath()); writer.append(new Text(relative), child); @@ -1294,20 +1311,21 @@ public class DistCp implements Tool { checkAndClose(writer); } - //sort lsr results + // sort lsr results final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted"); SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs, new Text.Comparator(), Text.class, FileStatus.class, jobconf); sorter.sort(dstlsr, sortedlsr); - //compare lsr list and dst list + // compare lsr list and dst list + final String jobDirStr = jobdir.toString(); SequenceFile.Reader lsrin = null; SequenceFile.Reader dstin = null; try { lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf); dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf); - //compare sorted lsr list and sorted dst list + // compare sorted lsr list and sorted dst list final Text lsrpath = new Text(); final FileStatus lsrstatus = new FileStatus(); final Text dstpath = new Text(); @@ -1317,6 +1335,12 @@ public class DistCp implements Tool { boolean hasnext = dstin.next(dstpath, dstfrom); for(; lsrin.next(lsrpath, lsrstatus); ) { + // + // check if lsrpath is in dst (represented here by dstsorted, which + // contains files and dirs to be copied from the source to destination), + // delete it if it doesn't exist in dst AND it's not jobDir or jobDir's + // ancestor. + // int dst_cmp_lsr = dstpath.compareTo(lsrpath); for(; hasnext && dst_cmp_lsr < 0; ) { hasnext = dstin.next(dstpath, dstfrom); @@ -1324,12 +1348,22 @@ public class DistCp implements Tool { } if (dst_cmp_lsr == 0) { - //lsrpath exists in dst, skip it + // lsrpath exists in dst, skip it hasnext = dstin.next(dstpath, dstfrom); } else { - //lsrpath does not exist, delete it + // lsrpath does not exist in dst, delete it if it's not jobDir or + // jobDir's ancestor String s = new Path(dstroot.getPath(), lsrpath.toString()).toString(); + if (needToFilterJobDir) { + int cmpJobDir = s.compareTo(jobDirStr); + if (cmpJobDir > 0) { + // do nothing + } else if (cmpJobDir == 0 || isAncestorPath(s, jobDirStr)) { + continue; + } + } + if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) { shellargs[1] = s; int r = 0;