Hi Tim, thank you very much for reporting the bug and providing a fix for it, greatly appreciated! Would you mind creating a JIRA [1] and attaching your patch there? Unfortunately due to legal restrictions we can't accept "email" patches.
Jarcec Links: 1: https://issues.apache.org/jira/browse/SQOOP On Thu, Jun 13, 2013 at 12:46:48PM -0400, Tim Howe wrote: > Hello all, > > I was recently tasked with looking into a problem using Sqoop's > incremental import on our installation, namely that any imports after > the first would report success but the data would never appear. A > temporary file was created on HDFS with the data but deleted upon > completion rather than being moved into place. > > It turned out to be a conflict between the "direct mode" database > manager (for PostgreSQL, in this case) and "incremental mode" import. > Ordinarily Sqoop ends up creating files named part-m-nnnnn where nnnnn > is an incrementing file partition number. However the direct mode > importer creates files of the form data-nnnnn. This poses a problem > because AppendUtils, which is used to move files into place at the end > of a direct import, only copies files which match that part-m-nnnnn > format and discards the rest. > > I've written a patch which causes direct imports to use the same naming > convention elsewhere. Attached please also find some changes to > AppendUtils which improve resiliency especially if there happen to be > multiple concurrent operations on the same table. This patch is against > sqoop-1.3.0-cdh3u3 but seems to apply and build with minimal changes > across the whole 1.x series. > > Please let me know if anyone finds this useful or if you have any > further suggestions. In particular I am curious where the > "part-m-nnnnn" naming comes from and if the "-m" signifies anything. I > did hunt around in order to find the code which creates those files but > with no luck. > > Thanks and regards, > -- > Tim Howe > Data Warehouse > TripAdvisor > diff -ru > unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java > devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java > --- > unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java > 2012-01-26 13:42:29.000000000 -0500 > +++ > devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java > 2013-06-13 11:31:22.476128082 -0400 > @@ -51,6 +51,8 @@ > private static final String FILEPART_SEPARATOR = "-"; > private static final String FILEEXT_SEPARATOR = "."; > > + private static final Pattern DATA_PART_PATTERN = > Pattern.compile("part.*-([0-9]{" + PARTITION_DIGITS + "}+).*"); > + > private ImportJobContext context = null; > > public AppendUtils(ImportJobContext context) { > @@ -118,11 +120,10 @@ > int nextPartition = 0; > FileStatus[] existingFiles = fs.listStatus(targetDir); > if (existingFiles != null && existingFiles.length > 0) { > - Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); > for (FileStatus fileStat : existingFiles) { > if (!fileStat.isDir()) { > String filename = fileStat.getPath().getName(); > - Matcher mat = patt.matcher(filename); > + Matcher mat = DATA_PART_PATTERN.matcher(filename); > if (mat.matches()) { > int thisPart = Integer.parseInt(mat.group(1)); > if (thisPart >= nextPartition) { > @@ -142,52 +143,94 @@ > } > > /** > - * Move files from source to target using a specified starting partition. > + * Move selected files from source to target using a specified starting > partition. > + * > + * Directories are moved without restriction. Note that the serial > + * number of directories bears no relation to the file partition > + * numbering. > */ > private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir, > int partitionStart) throws IOException { > > - NumberFormat numpart = NumberFormat.getInstance(); > - numpart.setMinimumIntegerDigits(PARTITION_DIGITS); > - numpart.setGroupingUsed(false); > - Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*"); > - FileStatus[] tempFiles = fs.listStatus(sourceDir); > + /* list files in the source dir and check for errors */ > + > + FileStatus[] sourceFiles = fs.listStatus(sourceDir); > > - if (null == tempFiles) { > + if (null == sourceFiles) { > // If we've already checked that the dir exists, and now it can't be > // listed, this is a genuine error (permissions, fs integrity, or > other). > throw new IOException("Could not list files from " + sourceDir); > } > > - // Move and rename files & directories from temporary to target-dir thus > - // appending file's next partition > - for (FileStatus fileStat : tempFiles) { > - if (!fileStat.isDir()) { > - // Move imported data files > - String filename = fileStat.getPath().getName(); > - Matcher mat = patt.matcher(filename); > - if (mat.matches()) { > - String name = getFilename(filename); > - String fileToMove = name.concat(numpart.format(partitionStart++)); > - String extension = getFileExtension(filename); > - if (extension != null) { > - fileToMove = fileToMove.concat(extension); > - } > - LOG.debug("Filename: " + filename + " repartitioned to: " > - + fileToMove); > - fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove)); > - } > - } else { > - // Move directories (_logs & any other) > - String dirName = fileStat.getPath().getName(); > - Path path = new Path(targetDir, dirName); > - int dirNumber = 0; > - while (fs.exists(path)) { > - path = new Path(targetDir, dirName.concat("-").concat( > - numpart.format(dirNumber++))); > + > + /* state used throughout the entire move operation */ > + > + // pad the data partition number thusly > + NumberFormat partFormat = NumberFormat.getInstance(); > + partFormat.setMinimumIntegerDigits(PARTITION_DIGITS); > + partFormat.setGroupingUsed(false); > + > + // where the data partitioning is currently at > + int dataPart = partitionStart; > + > + > + /* loop through all top-level files and copy matching ones */ > + > + for (FileStatus fileStatus : sourceFiles) { > + String sourceFilename = fileStatus.getPath().getName(); > + StringBuilder destFilename = new StringBuilder(); > + > + if (fileStatus.isDir()) { // move all subdirectories > + // pass target dir as initial dest to prevent nesting inside > preexisting dir > + if (fs.rename(fileStatus.getPath(), targetDir)) { > + LOG.debug("Directory: " + sourceFilename + " renamed to: " + > sourceFilename); > + } else { > + int dirNumber = 0; > + Path destPath; > + do { > + // clear the builder in case this isn't the first iteration > + destFilename.setLength(0); > + > + // name-nnnnn? > + destFilename > + .append(sourceFilename) > + .append("-") > + .append(partFormat.format(dirNumber++)); > + > + destPath = new Path(targetDir, destFilename.toString()); > + if (fs.exists(destPath)) > + continue; > + > + /* > + * There's still a race condition right here if an > + * identically-named directory is created concurrently. > + * It can be avoided by creating a parent dir for all > + * migrated dirs, or by an intermediate rename. > + */ > + > + } while (!fs.rename(fileStatus.getPath(), destPath)); > + > + LOG.debug("Directory: " + sourceFilename + " renamed to: " + > destPath.getName()); > } > - LOG.debug("Directory: " + dirName + " renamed to: " + > path.getName()); > - fs.rename(fileStat.getPath(), path); > + } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) { > // move only matching top-level files > + do { > + // clear the builder in case this isn't the first iteration > + destFilename.setLength(0); > + > + // name-nnnnn > + destFilename > + .append(getFilename(sourceFilename)) > + .append(partFormat.format(dataPart++)); > + > + // .ext? > + String extension = getFileExtension(sourceFilename); > + if (extension != null) > + destFilename.append(getFileExtension(sourceFilename)); > + } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, > destFilename.toString()))); > + > + LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + > destFilename.toString()); > + } else { // ignore everything else > + LOG.debug("Filename: " + sourceFilename + " ignored"); > } > } > } > diff -ru > unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java > > devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java > --- > unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java > 2012-01-26 13:42:29.000000000 -0500 > +++ > devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java > 2013-06-13 11:31:22.475128082 -0400 > @@ -88,7 +88,7 @@ > > // This Writer will be closed by the caller. > return new SplittableBufferedWriter( > - new SplittingOutputStream(conf, destDir, "data-", > + new SplittingOutputStream(conf, destDir, "part-m-", > options.getDirectSplitSize(), getCodec(conf, options))); > } >
signature.asc
Description: Digital signature
