Author: amareshwari
Date: Fri Mar 22 10:09:06 2013
New Revision: 1459690

URL: http://svn.apache.org/r1459690
Log:
MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing. Contributed by 
Srikanth Sundarrajan

Modified:
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
    
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
 Fri Mar 22 10:09:06 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.tools.util.Dist
 import org.apache.hadoop.security.Credentials;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 
 /**
  * The CopyListing abstraction is responsible for how the list of
@@ -193,14 +194,34 @@ public abstract class CopyListing extend
    * @param credentials Credentials object on which the FS delegation tokens 
are cached
    * @param options The input Options, to help choose the appropriate 
CopyListing Implementation.
    * @return An instance of the appropriate CopyListing implementation.
+   * @throws java.io.IOException - Exception if any
    */
   public static CopyListing getCopyListing(Configuration configuration,
                                            Credentials credentials,
-                                           DistCpOptions options) {
-    if (options.getSourceFileListing() == null) {
-      return new GlobbedCopyListing(configuration, credentials);
-    } else {
-      return new FileBasedCopyListing(configuration, credentials);
+                                           DistCpOptions options)
+      throws IOException {
+
+    String copyListingClassName = configuration.get(DistCpConstants.
+        CONF_LABEL_COPY_LISTING_CLASS, "");
+    Class<? extends CopyListing> copyListingClass;
+    try {
+      if (! copyListingClassName.isEmpty()) {
+        copyListingClass = configuration.getClass(DistCpConstants.
+            CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
+            CopyListing.class);
+      } else {
+        if (options.getSourceFileListing() == null) {
+            copyListingClass = GlobbedCopyListing.class;
+        } else {
+            copyListingClass = FileBasedCopyListing.class;
+        }
+      }
+      copyListingClassName = copyListingClass.getName();
+      Constructor<? extends CopyListing> constructor = copyListingClass.
+          getDeclaredConstructor(Configuration.class, Credentials.class);
+      return constructor.newInstance(configuration, credentials);
+    } catch (Exception e) {
+      throw new IOException("Unable to instantiate " + copyListingClassName, 
e);
     }
   }
 

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
 Fri Mar 22 10:09:06 2013
@@ -319,7 +319,7 @@ public class DistCp extends Configured i
    * @return Returns the path where the copy listing is created
    * @throws IOException - If any
    */
-  private Path createInputFileListing(Job job) throws IOException {
+  protected Path createInputFileListing(Job job) throws IOException {
     Path fileListingPath = getFileListingPath();
     CopyListing copyListing = 
CopyListing.getCopyListing(job.getConfiguration(),
         job.getCredentials(), inputOptions);
@@ -334,7 +334,7 @@ public class DistCp extends Configured i
    * @return - Path where the copy listing file has to be saved
    * @throws IOException - Exception if any
    */
-  private Path getFileListingPath() throws IOException {
+  protected Path getFileListingPath() throws IOException {
     String fileListPathStr = metaFolder + "/fileList.seq";
     Path path = new Path(fileListPathStr);
     return new Path(path.toUri().normalize().toString());

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
 Fri Mar 22 10:09:06 2013
@@ -82,6 +82,9 @@ public class DistCpConstants {
   /* Meta folder where the job's intermediate data is kept */
   public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
 
+  /* DistCp CopyListing class override param */
+  public static final String CONF_LABEL_COPY_LISTING_CLASS = 
"distcp.copy.listing.class";
+
   /**
    * Conf label for SSL Trust-store location.
    */

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
 Fri Mar 22 10:09:06 2013
@@ -127,17 +127,20 @@ public class SimpleCopyListing extends C
             if (LOG.isDebugEnabled()) {
               LOG.debug("Recording source-path: " + sourceStatus.getPath() + " 
for copy.");
             }
-            writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot, 
localFile);
+            writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
+                localFile, options);
 
             if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Traversing non-empty source dir: " + 
sourceStatus.getPath());
               }
-              traverseNonEmptyDirectory(fileListWriter, sourceStatus, 
sourcePathRoot, localFile);
+              traverseNonEmptyDirectory(fileListWriter, sourceStatus, 
sourcePathRoot,
+                  localFile, options);
             }
           }
         } else {
-          writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, 
localFile);
+          writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
+              localFile, options);
         }
       }
     } finally {
@@ -169,6 +172,17 @@ public class SimpleCopyListing extends C
     }
   }
 
+  /**
+   * Provide an option to skip copy of a path, Allows for exclusion
+   * of files such as {@link 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
+   * @param path - Path being considered for copy while building the file 
listing
+   * @param options - Input options passed during DistCp invocation
+   * @return - True if the path should be considered for copy, false otherwise
+   */
+  protected boolean shouldCopy(Path path, DistCpOptions options) {
+    return true;
+  }
+
   /** {@inheritDoc} */
   @Override
   protected long getBytesToCopy() {
@@ -210,7 +224,9 @@ public class SimpleCopyListing extends C
 
   private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
                                          FileStatus sourceStatus,
-                                         Path sourcePathRoot, boolean 
localFile)
+                                         Path sourcePathRoot,
+                                         boolean localFile,
+                                         DistCpOptions options)
                                          throws IOException {
     FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
     Stack<FileStatus> pathStack = new Stack<FileStatus>();
@@ -221,7 +237,8 @@ public class SimpleCopyListing extends C
         if (LOG.isDebugEnabled())
           LOG.debug("Recording source-path: "
                     + sourceStatus.getPath() + " for copy.");
-        writeToFileListing(fileListWriter, child, sourcePathRoot, localFile);
+        writeToFileListing(fileListWriter, child, sourcePathRoot,
+             localFile, options);
         if (isDirectoryAndNotEmpty(sourceFS, child)) {
           if (LOG.isDebugEnabled())
             LOG.debug("Traversing non-empty source dir: "
@@ -233,8 +250,10 @@ public class SimpleCopyListing extends C
   }
 
   private void writeToFileListing(SequenceFile.Writer fileListWriter,
-                                  FileStatus fileStatus, Path sourcePathRoot,
-                                  boolean localFile) throws IOException {
+                                  FileStatus fileStatus,
+                                  Path sourcePathRoot,
+                                  boolean localFile,
+                                  DistCpOptions options) throws IOException {
     if (fileStatus.getPath().equals(sourcePathRoot) && 
fileStatus.isDirectory())
       return; // Skip the root-paths.
 
@@ -248,6 +267,10 @@ public class SimpleCopyListing extends C
       status = getFileStatus(fileStatus);
     }
 
+    if (!shouldCopy(fileStatus.getPath(), options)) {
+      return;
+    }
+
     fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
         fileStatus.getPath())), status);
     fileListWriter.sync();

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
 Fri Mar 22 10:09:06 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.Credentials;
@@ -79,7 +80,39 @@ public class TestCopyListing extends Sim
     return 0;
   }
 
-  @Test
+  @Test(timeout=10000)
+  public void testSkipCopy() throws Exception {
+    SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) {
+      @Override
+      protected boolean shouldCopy(Path path, DistCpOptions options) {
+        return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME);
+      }
+    };
+    FileSystem fs = FileSystem.get(getConf());
+    List<Path> srcPaths = new ArrayList<Path>();
+    srcPaths.add(new Path("/tmp/in4/1"));
+    srcPaths.add(new Path("/tmp/in4/2"));
+    Path target = new Path("/tmp/out4/1");
+    TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS");
+    TestDistCpUtils.createFile(fs, "/tmp/in4/1/file");
+    TestDistCpUtils.createFile(fs, "/tmp/in4/2");
+    fs.mkdirs(target);
+    DistCpOptions options = new DistCpOptions(srcPaths, target);
+    Path listingFile = new Path("/tmp/list4");
+    listing.buildListing(listingFile, options);
+    Assert.assertEquals(listing.getNumberOfPaths(), 2);
+    SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
+        SequenceFile.Reader.file(listingFile));
+    FileStatus fileStatus = new FileStatus();
+    Text relativePath = new Text();
+    Assert.assertTrue(reader.next(relativePath, fileStatus));
+    Assert.assertEquals(relativePath.toString(), "/1/file");
+    Assert.assertTrue(reader.next(relativePath, fileStatus));
+    Assert.assertEquals(relativePath.toString(), "/2");
+    Assert.assertFalse(reader.next(relativePath, fileStatus));
+  }
+
+  @Test(timeout=10000)
   public void testMultipleSrcToFile() {
     FileSystem fs = null;
     try {
@@ -124,7 +157,7 @@ public class TestCopyListing extends Sim
     }
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testDuplicates() {
     FileSystem fs = null;
     try {
@@ -150,7 +183,7 @@ public class TestCopyListing extends Sim
     }
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testBuildListing() {
     FileSystem fs = null;
     try {
@@ -206,7 +239,7 @@ public class TestCopyListing extends Sim
     }
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testBuildListingForSingleFile() {
     FileSystem fs = null;
     String testRootString = "/singleFileListing";

Modified: 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
 (original)
+++ 
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
 Fri Mar 22 10:09:06 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -34,6 +35,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class TestIntegration {
@@ -68,7 +70,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testSingleFileMissingTarget() {
     caseSingleFileMissingTarget(false);
     caseSingleFileMissingTarget(true);
@@ -91,7 +93,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testSingleFileTargetFile() {
     caseSingleFileTargetFile(false);
     caseSingleFileTargetFile(true);
@@ -114,7 +116,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testSingleFileTargetDir() {
     caseSingleFileTargetDir(false);
     caseSingleFileTargetDir(true);
@@ -138,7 +140,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testSingleDirTargetMissing() {
     caseSingleDirTargetMissing(false);
     caseSingleDirTargetMissing(true);
@@ -161,7 +163,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testSingleDirTargetPresent() {
 
     try {
@@ -180,7 +182,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testUpdateSingleDirTargetPresent() {
 
     try {
@@ -199,7 +201,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testMultiFileTargetPresent() {
     caseMultiFileTargetPresent(false);
     caseMultiFileTargetPresent(true);
@@ -223,7 +225,56 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
+  public void testCustomCopyListing() {
+
+    try {
+      addEntries(listFile, "multifile1/file3", "multifile1/file4", 
"multifile1/file5");
+      createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5");
+      mkdirs(target.toString());
+
+      Configuration conf = getConf();
+      try {
+        conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS,
+            CustomCopyListing.class, CopyListing.class);
+        DistCpOptions options = new DistCpOptions(Arrays.
+            asList(new Path(root + "/" + "multifile1")), target);
+        options.setSyncFolder(true);
+        options.setDeleteMissing(false);
+        options.setOverwrite(false);
+        try {
+          new DistCp(conf, options).execute();
+        } catch (Exception e) {
+          LOG.error("Exception encountered ", e);
+          throw new IOException(e);
+        }
+      } finally {
+        conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS);
+      }
+
+      checkResult(target, 2, "file4", "file5");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  private static class CustomCopyListing extends SimpleCopyListing {
+
+    public CustomCopyListing(Configuration configuration,
+                             Credentials credentials) {
+      super(configuration, credentials);
+    }
+
+    @Override
+    protected boolean shouldCopy(Path path, DistCpOptions options) {
+      return !path.getName().equals("file3");
+    }
+  }
+
+  @Test(timeout=100000)
   public void testMultiFileTargetMissing() {
     caseMultiFileTargetMissing(false);
     caseMultiFileTargetMissing(true);
@@ -246,7 +297,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testMultiDirTargetPresent() {
 
     try {
@@ -265,7 +316,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testUpdateMultiDirTargetPresent() {
 
     try {
@@ -284,7 +335,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testMultiDirTargetMissing() {
 
     try {
@@ -304,7 +355,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testUpdateMultiDirTargetMissing() {
 
     try {
@@ -323,7 +374,7 @@ public class TestIntegration {
     }
   }
   
-  @Test
+  @Test(timeout=100000)
   public void testDeleteMissingInDestination() {
     
     try {
@@ -343,7 +394,7 @@ public class TestIntegration {
     }
   }
   
-  @Test
+  @Test(timeout=100000)
   public void testOverwrite() {
     byte[] contents1 = "contents1".getBytes();
     byte[] contents2 = "contents2".getBytes();
@@ -375,7 +426,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testGlobTargetMissingSingleLevel() {
 
     try {
@@ -398,7 +449,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testUpdateGlobTargetMissingSingleLevel() {
 
     try {
@@ -420,7 +471,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testGlobTargetMissingMultiLevel() {
 
     try {
@@ -444,7 +495,7 @@ public class TestIntegration {
     }
   }
 
-  @Test
+  @Test(timeout=100000)
   public void testUpdateGlobTargetMissingMultiLevel() {
 
     try {
@@ -468,7 +519,7 @@ public class TestIntegration {
     }
   }
   
-  @Test
+  @Test(timeout=100000)
   public void testCleanup() {
     try {
       Path sourcePath = new Path("noscheme:///file");


Reply via email to