Hello all,

I was recently tasked with looking into a problem using Sqoop's
incremental import on our installation, namely that any imports after
the first would report success but the data would never appear.  A
temporary file was created on HDFS with the data but deleted upon
completion rather than being moved into place.

It turned out to be a conflict between the "direct mode" database
manager (for PostgreSQL, in this case) and "incremental mode" import.
Ordinarily Sqoop ends up creating files named part-m-nnnnn where nnnnn
is an incrementing file partition number.  However the direct mode
importer creates files of the form data-nnnnn.  This poses a problem
because AppendUtils, which is used to move files into place at the end
of a direct import, only copies files which match that part-m-nnnnn
format and discards the rest.

I've written a patch which causes direct imports to use the same naming
convention elsewhere.  Attached please also find some changes to
AppendUtils which improve resiliency especially if there happen to be
multiple concurrent operations on the same table.  This patch is against
sqoop-1.3.0-cdh3u3 but seems to apply and build with minimal changes
across the whole 1.x series.

Please let me know if anyone finds this useful or if you have any
further suggestions.  In particular I am curious where the
"part-m-nnnnn" naming comes from and if the "-m" signifies anything.  I
did hunt around in order to find the code which creates those files but
with no luck.

Thanks and regards,
-- 
Tim Howe
Data Warehouse
TripAdvisor
diff -ru unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java
--- unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java	2012-01-26 13:42:29.000000000 -0500
+++ devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java	2013-06-13 11:31:22.476128082 -0400
@@ -51,6 +51,8 @@
   private static final String FILEPART_SEPARATOR = "-";
   private static final String FILEEXT_SEPARATOR = ".";
 
+  private static final Pattern DATA_PART_PATTERN = Pattern.compile("part.*-([0-9]{" + PARTITION_DIGITS + "}+).*");
+
   private ImportJobContext context = null;
 
   public AppendUtils(ImportJobContext context) {
@@ -118,11 +120,10 @@
     int nextPartition = 0;
     FileStatus[] existingFiles = fs.listStatus(targetDir);
     if (existingFiles != null && existingFiles.length > 0) {
-      Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
       for (FileStatus fileStat : existingFiles) {
         if (!fileStat.isDir()) {
           String filename = fileStat.getPath().getName();
-          Matcher mat = patt.matcher(filename);
+          Matcher mat = DATA_PART_PATTERN.matcher(filename);
           if (mat.matches()) {
             int thisPart = Integer.parseInt(mat.group(1));
             if (thisPart >= nextPartition) {
@@ -142,52 +143,94 @@
   }
 
   /**
-   * Move files from source to target using a specified starting partition.
+   * Move selected files from source to target using a specified starting partition.
+   *
+   * Directories are moved without restriction.  Note that the serial
+   * number of directories bears no relation to the file partition
+   * numbering.
    */
   private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
       int partitionStart) throws IOException {
 
-    NumberFormat numpart = NumberFormat.getInstance();
-    numpart.setMinimumIntegerDigits(PARTITION_DIGITS);
-    numpart.setGroupingUsed(false);
-    Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
-    FileStatus[] tempFiles = fs.listStatus(sourceDir);
+    /* list files in the source dir and check for errors */
+
+    FileStatus[] sourceFiles = fs.listStatus(sourceDir);
 
-    if (null == tempFiles) {
+    if (null == sourceFiles) {
       // If we've already checked that the dir exists, and now it can't be
       // listed, this is a genuine error (permissions, fs integrity, or other).
       throw new IOException("Could not list files from " + sourceDir);
     }
 
-    // Move and rename files & directories from temporary to target-dir thus
-    // appending file's next partition
-    for (FileStatus fileStat : tempFiles) {
-      if (!fileStat.isDir()) {
-        // Move imported data files
-        String filename = fileStat.getPath().getName();
-        Matcher mat = patt.matcher(filename);
-        if (mat.matches()) {
-          String name = getFilename(filename);
-          String fileToMove = name.concat(numpart.format(partitionStart++));
-          String extension = getFileExtension(filename);
-          if (extension != null) {
-            fileToMove = fileToMove.concat(extension);
-          }
-          LOG.debug("Filename: " + filename + " repartitioned to: "
-              + fileToMove);
-          fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove));
-        }
-      } else {
-        // Move directories (_logs & any other)
-        String dirName = fileStat.getPath().getName();
-        Path path = new Path(targetDir, dirName);
-        int dirNumber = 0;
-        while (fs.exists(path)) {
-          path = new Path(targetDir, dirName.concat("-").concat(
-              numpart.format(dirNumber++)));
+
+    /* state used throughout the entire move operation */
+
+    // pad the data partition number thusly
+    NumberFormat partFormat = NumberFormat.getInstance();
+    partFormat.setMinimumIntegerDigits(PARTITION_DIGITS);
+    partFormat.setGroupingUsed(false);
+
+    // where the data partitioning is currently at
+    int dataPart = partitionStart;
+
+
+    /* loop through all top-level files and copy matching ones */
+
+    for (FileStatus fileStatus : sourceFiles) {
+      String        sourceFilename = fileStatus.getPath().getName();
+      StringBuilder destFilename   = new StringBuilder();
+
+      if (fileStatus.isDir()) {    // move all subdirectories
+        // pass target dir as initial dest to prevent nesting inside preexisting dir
+        if (fs.rename(fileStatus.getPath(), targetDir)) {    
+          LOG.debug("Directory: " + sourceFilename + " renamed to: " + sourceFilename);
+        } else {
+          int dirNumber = 0;
+          Path destPath;
+          do {
+            // clear the builder in case this isn't the first iteration
+            destFilename.setLength(0);
+
+            // name-nnnnn?
+            destFilename
+              .append(sourceFilename)
+              .append("-")
+              .append(partFormat.format(dirNumber++));
+
+            destPath = new Path(targetDir, destFilename.toString());
+            if (fs.exists(destPath))
+              continue;
+
+            /*
+             * There's still a race condition right here if an
+             * identically-named directory is created concurrently.
+             * It can be avoided by creating a parent dir for all
+             * migrated dirs, or by an intermediate rename.
+             */
+
+          } while (!fs.rename(fileStatus.getPath(), destPath));
+
+          LOG.debug("Directory: " + sourceFilename + " renamed to: " + destPath.getName());
         }
-        LOG.debug("Directory: " + dirName + " renamed to: " + path.getName());
-        fs.rename(fileStat.getPath(), path);
+      } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) {    // move only matching top-level files
+        do {
+          // clear the builder in case this isn't the first iteration
+          destFilename.setLength(0);
+
+          // name-nnnnn
+          destFilename
+            .append(getFilename(sourceFilename))
+            .append(partFormat.format(dataPart++));
+
+          // .ext?
+          String extension = getFileExtension(sourceFilename);
+          if (extension != null)
+            destFilename.append(getFileExtension(sourceFilename));
+        } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
+
+        LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
+      } else {    // ignore everything else
+        LOG.debug("Filename: " + sourceFilename + " ignored");
       }
     }
   }
diff -ru unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java
--- unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java	2012-01-26 13:42:29.000000000 -0500
+++ devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java	2013-06-13 11:31:22.475128082 -0400
@@ -88,7 +88,7 @@
 
     // This Writer will be closed by the caller.
     return new SplittableBufferedWriter(
-        new SplittingOutputStream(conf, destDir, "data-",
+        new SplittingOutputStream(conf, destDir, "part-m-",
         options.getDirectSplitSize(), getCodec(conf, options)));
   }
 

Reply via email to