Author: atm
Date: Tue Jan  7 00:24:09 2014
New Revision: 1556082

URL: http://svn.apache.org/r1556082
Log:
HDFS-5685. DistCp will fail to copy with -delete switch. Contributed by Yongjun 
Zhang.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1556082&r1=1556081&r2=1556082&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Jan  7 00:24:09 2014
@@ -181,6 +181,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5698. Backport MAPREDUCE-1285 to branch-1 (Yongjun Zhang via
     Sandy Ryza)
 
+    HDFS-5685. DistCp will fail to copy with -delete switch. (Yongjun Zhang
+    via atm)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=1556082&r1=1556081&r2=1556082&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
 Tue Jan  7 00:24:09 2014
@@ -49,6 +49,10 @@ import org.apache.log4j.Level;
  * A JUnit test for copying files recursively.
  */
 public class TestCopyFiles extends TestCase {
+  
+  private static final String JT_STAGING_AREA_ROOT = 
"mapreduce.jobtracker.staging.root.dir";
+  private static final String JT_STAGING_AREA_ROOT_DEFAULT = 
"/tmp/hadoop/mapred/staging";
+
   {
     ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
         ).getLogger().setLevel(Level.OFF);
@@ -56,8 +60,9 @@ public class TestCopyFiles extends TestC
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
     ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
   }
-  
-  static final URI LOCAL_FS = URI.create("file:///");
+
+  private static final String LOCAL_FS_STR = "file:///";
+  private static final URI LOCAL_FS_URI = URI.create(LOCAL_FS_STR);
   
   private static final Random RAN = new Random();
   private static final int NFILES = 20;
@@ -255,11 +260,11 @@ public class TestCopyFiles extends TestC
   /** copy files from local file system to local file system */
   public void testCopyFromLocalToLocal() throws Exception {
     Configuration conf = new Configuration();
-    FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
-    MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
+    FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf);
+    MyFile[] files = createFiles(LOCAL_FS_URI, TEST_ROOT_DIR+"/srcdat");
     ToolRunner.run(new DistCp(new Configuration()),
-                           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
-                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
+                           new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+                                         
LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
     assertTrue("Source and destination directories do not match.",
                checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
     deldir(localfs, TEST_ROOT_DIR+"/destdat");
@@ -305,11 +310,11 @@ public class TestCopyFiles extends TestC
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = hdfs.getUri().toString();
       if (namenode.startsWith("hdfs://")) {
-        MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
+        MyFile[] files = createFiles(LOCAL_FS_URI, TEST_ROOT_DIR+"/srcdat");
         ToolRunner.run(new DistCp(conf), new String[] {
                                          "-log",
                                          namenode+"/logs",
-                                         "file:///"+TEST_ROOT_DIR+"/srcdat",
+                                         LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
                                          namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(cluster.getFileSystem(), "/destdat", files));
@@ -317,7 +322,7 @@ public class TestCopyFiles extends TestC
                     hdfs.exists(new Path(namenode+"/logs")));
         deldir(hdfs, "/destdat");
         deldir(hdfs, "/logs");
-        deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
+        deldir(FileSystem.get(LOCAL_FS_URI, conf), TEST_ROOT_DIR+"/srcdat");
       }
     } finally {
       if (cluster != null) { cluster.shutdown(); }
@@ -329,7 +334,7 @@ public class TestCopyFiles extends TestC
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
+      final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf);
       cluster = new MiniDFSCluster(conf, 1, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = FileSystem.getDefaultUri(conf).toString();
@@ -339,7 +344,7 @@ public class TestCopyFiles extends TestC
                                          "-log",
                                          "/logs",
                                          namenode+"/srcdat",
-                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
+                                         
LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
         assertTrue("Log directory does not exist.",
@@ -413,7 +418,7 @@ public class TestCopyFiles extends TestC
     }
   }
 
- public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
+  public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
@@ -488,21 +493,119 @@ public class TestCopyFiles extends TestC
     }
   }
 
+  /**
+   * A helper function to test copying files between local file system and dfs
+   * file system, with staging area set to local file system. 
+   */
+  private void stagingAreaTest(final FileSystem srcFs, final FileSystem destFs,
+      MiniDFSCluster cluster, Configuration conf) throws Exception {
+    try {
+      final String fileDir = "/files";
+      final String srcParent = "/srcdat";
+      final String destParent = "/destdata";
+      final String source = srcParent + fileDir;
+      final String destination = destParent + fileDir;
+      final String logs = "/logs";
+      String logDir = TEST_ROOT_DIR + logs;
+
+      URI srcUri = srcFs.getUri();
+      URI destUri = destFs.getUri();
+
+      final boolean isSrcLocalFs = 
srcUri.getScheme().equals(LOCAL_FS_URI.getScheme());
+
+      final FileSystem localFs = FileSystem.get(LOCAL_FS_URI, conf);
+      String prevStagingArea =
+          conf.get(JT_STAGING_AREA_ROOT, JT_STAGING_AREA_ROOT_DEFAULT);
+      String newStagingArea = (isSrcLocalFs? source : destination);
+      newStagingArea += "/STAGING";
+      conf.set(JT_STAGING_AREA_ROOT, TEST_ROOT_DIR + newStagingArea);
+        
+      final String srcParentPrefix = isSrcLocalFs? TEST_ROOT_DIR : "";
+      final String destParentPrefix = isSrcLocalFs? "" : TEST_ROOT_DIR;
+ 
+      String createDelSrcParent = srcParentPrefix + srcParent;
+      String createDelDestParent = destParentPrefix + destParent;
+      String createDelSrc = createDelSrcParent + fileDir;
+      String createDelDest = createDelDestParent + fileDir;
+      
+      MyFile[] srcFiles = createFiles(srcUri, createDelSrc);
+      createFiles(destUri, createDelDest);
+
+      String distcpSrc = String.valueOf(srcUri) + createDelSrc;     
+      String distcpDest = String.valueOf(destUri) + createDelDest;
+      
+      ToolRunner.run(new DistCp(conf), new String[] {
+        "-log",
+        LOCAL_FS_STR + logDir,
+        "-update",
+        "-delete",
+        distcpSrc,
+        distcpDest});
+          
+        assertTrue("Source and destination directories do not match.",
+            checkFiles(destFs, createDelDest, srcFiles));
+
+        deldir(localFs, logDir);
+        deldir(srcFs, createDelSrcParent);
+        deldir(destFs, createDelDestParent);
+
+        conf.set(JT_STAGING_AREA_ROOT, prevStagingArea); 
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * test copying files from local file system to dfs file system with staging
+   * area in src
+   */
+  public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+
+    String namenode = FileSystem.getDefaultUri(conf).toString();
+    assertTrue("Name node doesn't start with hdfs://", 
namenode.startsWith("hdfs://"));
+    
+    final FileSystem srcFs = FileSystem.get(LOCAL_FS_URI, conf);
+    final FileSystem destFs = cluster.getFileSystem();
+    
+    stagingAreaTest(srcFs, destFs, cluster, conf);
+  }
+
+  /**
+   * test copying files from dfs file system to local file system with staging
+   * area in dest
+   */
+  public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    
+    String namenode = FileSystem.getDefaultUri(conf).toString();
+    assertTrue("Name node doesn't start with hdfs://", 
namenode.startsWith("hdfs://"));
+    
+    final FileSystem srcFs = cluster.getFileSystem();
+    final FileSystem destFs = FileSystem.get(LOCAL_FS_URI, conf);
+    
+    stagingAreaTest(srcFs, destFs, cluster, conf);
+  }
+
   public void testCopyDuplication() throws Exception {
-    final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration());
+    final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, new 
Configuration());
     try {    
       MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat");
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
+          new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/src2/srcdat"});
       assertTrue("Source and destination directories do not match.",
                  checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files));
   
       assertEquals(DistCp.DuplicationException.ERROR_CODE,
           ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/destdat",}));
+          new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/src2/srcdat",
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat",}));
     }
     finally {
       deldir(localfs, TEST_ROOT_DIR+"/destdat");
@@ -512,14 +615,14 @@ public class TestCopyFiles extends TestC
   }
 
   public void testCopySingleFile() throws Exception {
-    FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration());
+    FileSystem fs = FileSystem.get(LOCAL_FS_URI, new Configuration());
     Path root = new Path(TEST_ROOT_DIR+"/srcdat");
     try {    
       MyFile[] files = {createFile(root, fs)};
       //copy a dir with a single file
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/destdat"});
+          new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
       assertTrue("Source and destination directories do not match.",
                  checkFiles(fs, TEST_ROOT_DIR+"/destdat", files));
       
@@ -528,8 +631,8 @@ public class TestCopyFiles extends TestC
       Path p = new Path(root, fname);
       FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
       ToolRunner.run(new DistCp(new Configuration()),
-          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
-                        "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
+          new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+fname,
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"+fname});
       assertTrue("Source and destination directories do not match.",
           checkFiles(fs, TEST_ROOT_DIR+"/dest2", files));     
       //copy single file to existing dir
@@ -539,16 +642,16 @@ public class TestCopyFiles extends TestC
       String sname = files2[0].getName();
       ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"-update",
-                        "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
-                        "file:///"+TEST_ROOT_DIR+"/dest2/"});
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
       assertTrue("Source and destination directories do not match.",
           checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
       updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1);
       //copy single file to existing dir w/ dst name conflict
       ToolRunner.run(new DistCp(new Configuration()),
           new String[] {"-update",
-                        "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
-                        "file:///"+TEST_ROOT_DIR+"/dest2/"});
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
+                        LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
       assertTrue("Source and destination directories do not match.",
           checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
     }
@@ -917,7 +1020,7 @@ public class TestCopyFiles extends TestC
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new Configuration();
-      final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
+      final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf);
       cluster = new MiniDFSCluster(conf, 1, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = FileSystem.getDefaultUri(conf).toString();
@@ -927,7 +1030,7 @@ public class TestCopyFiles extends TestC
         MyFile[] localFiles = createFiles(localfs, destdir);
         ToolRunner.run(new DistCp(conf), new String[] { "-delete", "-update",
             "-log", "/logs", namenode + "/srcdat",
-            "file:///" + TEST_ROOT_DIR + "/destdat" });
+            LOCAL_FS_STR + TEST_ROOT_DIR + "/destdat" });
         assertTrue("Source and destination directories do not match.",
             checkFiles(localfs, destdir, files));
         assertTrue("Log directory does not exist.",

Modified: 
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1556082&r1=1556081&r2=1556082&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java 
(original)
+++ 
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java 
Tue Jan  7 00:24:09 2014
@@ -1091,14 +1091,21 @@ public class DistCp implements Tool {
       (args.srcs.size() == 1 && !dstExists) || update || overwrite;
     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
     long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
+    final FileStatus jobDirStat = jobfs.getFileStatus(jobDirectory);
     try {
       for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
         final Path src = srcItr.next();
         FileSystem srcfs = src.getFileSystem(conf);
         FileStatus srcfilestat = srcfs.getFileStatus(src);
         Path root = special && srcfilestat.isDir()? src: src.getParent();
+        final boolean needToFilterJobDir = srcfs.equals(jobfs);
+
         if (srcfilestat.isDir()) {
-          ++srcCount;
+          if (needToFilterJobDir && (srcfilestat.compareTo(jobDirStat) == 0)) {
+            continue;
+          } else {
+            ++srcCount;
+          }
         }
 
         Stack<FileStatus> pathstack = new Stack<FileStatus>();
@@ -1109,12 +1116,17 @@ public class DistCp implements Tool {
             boolean skipfile = false;
             final FileStatus child = children[i]; 
             final String dst = makeRelative(root, child.getPath());
-            ++srcCount;
 
             if (child.isDir()) {
-              pathstack.push(child);
+              if (needToFilterJobDir && (child.compareTo(jobDirStat) == 0)) {
+                continue;
+              } else {
+                ++srcCount;
+                pathstack.push(child);
+              }
             }
             else {
+              ++srcCount;
               //skip file if the src and the dst files are the same.
               skipfile = update && 
                 sameFile(srcfs, child, dstfs, 
@@ -1272,17 +1284,22 @@ public class DistCp implements Tool {
           + ") is not a directory.");
     }
 
-    //write dst lsr results
+    // write dst lsr results
+    final boolean needToFilterJobDir = dstfs.equals(jobfs); 
+    final FileStatus jobDirStat = jobfs.getFileStatus(jobdir);   
     final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
     final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, 
jobconf,
         dstlsr, Text.class, dstroot.getClass(),
         SequenceFile.CompressionType.NONE);
     try {
-      //do lsr to get all file statuses in dstroot
+      // do lsr to get all file statuses in dstroot
       final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
       for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
         final FileStatus status = lsrstack.pop();
         if (status.isDir()) {
+          if (needToFilterJobDir && (status.compareTo(jobDirStat) == 0)) {
+            continue;
+          }      
           for(FileStatus child : dstfs.listStatus(status.getPath())) {
             String relative = makeRelative(dstroot.getPath(), child.getPath());
             writer.append(new Text(relative), child);
@@ -1294,20 +1311,21 @@ public class DistCp implements Tool {
       checkAndClose(writer);
     }
 
-    //sort lsr results
+    // sort lsr results
     final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
     SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
         new Text.Comparator(), Text.class, FileStatus.class, jobconf);
     sorter.sort(dstlsr, sortedlsr);
 
-    //compare lsr list and dst list  
+    // compare lsr list and dst list
+    final String jobDirStr = jobdir.toString();
     SequenceFile.Reader lsrin = null;
     SequenceFile.Reader dstin = null;
     try {
       lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
       dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
 
-      //compare sorted lsr list and sorted dst list
+      // compare sorted lsr list and sorted dst list
       final Text lsrpath = new Text();
       final FileStatus lsrstatus = new FileStatus();
       final Text dstpath = new Text();
@@ -1317,6 +1335,12 @@ public class DistCp implements Tool {
 
       boolean hasnext = dstin.next(dstpath, dstfrom);
       for(; lsrin.next(lsrpath, lsrstatus); ) {
+        //
+        // check if lsrpath is in dst (represented here by dstsorted, which
+        // contains files and dirs to be copied from the source to 
destination),
+        // delete it if it doesn't exist in dst AND it's not jobDir or jobDir's
+        // ancestor.
+        //
         int dst_cmp_lsr = dstpath.compareTo(lsrpath);
         for(; hasnext && dst_cmp_lsr < 0; ) {
           hasnext = dstin.next(dstpath, dstfrom);
@@ -1324,12 +1348,22 @@ public class DistCp implements Tool {
         }
         
         if (dst_cmp_lsr == 0) {
-          //lsrpath exists in dst, skip it
+          // lsrpath exists in dst, skip it
           hasnext = dstin.next(dstpath, dstfrom);
         }
         else {
-          //lsrpath does not exist, delete it
+          // lsrpath does not exist in dst, delete it if it's not jobDir or
+          // jobDir's ancestor
           String s = new Path(dstroot.getPath(), 
lsrpath.toString()).toString();
+          if (needToFilterJobDir) {
+            int cmpJobDir = s.compareTo(jobDirStr);
+            if (cmpJobDir > 0) {
+              // do nothing
+            } else if (cmpJobDir == 0 || isAncestorPath(s, jobDirStr)) {
+              continue;
+            }
+          }
+  
           if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
             shellargs[1] = s;
             int r = 0;


Reply via email to