Repository: hive Updated Branches: refs/heads/branch-3 744316e73 -> 656af1411 refs/heads/master dd343d5f0 -> 0b6967edd
HIVE-19215 : JavaUtils.AnyIdDirFilter ignores base_n directories (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b6967ed Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b6967ed Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b6967ed Branch: refs/heads/master Commit: 0b6967edd8e0b3e4955f0615e756d02e8295779e Parents: dd343d5 Author: sergey <ser...@apache.org> Authored: Wed Apr 25 11:27:40 2018 -0700 Committer: sergey <ser...@apache.org> Committed: Wed Apr 25 11:27:40 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/JavaUtils.java | 69 +----------------- .../apache/hadoop/hive/ql/exec/CopyTask.java | 3 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 41 ----------- .../apache/hadoop/hive/ql/exec/Utilities.java | 12 ++-- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 75 +++++++++++++++++++- .../hadoop/hive/ql/io/HiveInputFormat.java | 2 +- 6 files changed, 84 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0b6967ed/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index 45abd2f..e09dec1 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -28,8 +28,7 @@ import java.net.URLClassLoader; import java.util.Arrays; import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +37,6 @@ import org.slf4j.LoggerFactory; * Hive. */ public final class JavaUtils { - - public static final String BASE_PREFIX = "base"; - public static final String DELTA_PREFIX = "delta"; - public static final String DELTA_DIGITS = "%07d"; - public static final int DELTA_DIGITS_LEN = 7; - public static final String STATEMENT_DIGITS = "%04d"; private static final Logger LOG = LoggerFactory.getLogger(JavaUtils.class); private static final Method SUN_MISC_UTIL_RELEASE; @@ -166,64 +159,4 @@ public final class JavaUtils { private JavaUtils() { // prevent instantiation } - - public static Long extractWriteId(Path file) { - String fileName = file.getName(); - String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 - if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { - LOG.debug("Cannot extract write ID for a MM table: " + file - + " (" + Arrays.toString(parts) + ")"); - return null; - } - long writeId = -1; - try { - writeId = Long.parseLong(parts[1]); - } catch (NumberFormatException ex) { - LOG.debug("Cannot extract write ID for a MM table: " + file - + "; parsing " + parts[1] + " got " + ex.getMessage()); - return null; - } - return writeId; - } - - public static class IdPathFilter implements PathFilter { - private String baseDirName, deltaDirName; - private final boolean isDeltaPrefix; - - public IdPathFilter(long writeId, int stmtId) { - String deltaDirName = null; - deltaDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + - String.format(DELTA_DIGITS, writeId); - isDeltaPrefix = (stmtId < 0); - if (!isDeltaPrefix) { - deltaDirName += "_" + String.format(STATEMENT_DIGITS, stmtId); - } - - this.baseDirName = BASE_PREFIX + "_" + String.format(DELTA_DIGITS, writeId); - this.deltaDirName = deltaDirName; - } - - @Override - public boolean accept(Path path) { - String name = path.getName(); - return name.equals(baseDirName) || (isDeltaPrefix && name.startsWith(deltaDirName)) - || (!isDeltaPrefix && name.equals(deltaDirName)); - } - } - - public static class AnyIdDirFilter implements PathFilter { - @Override - public boolean accept(Path path) { - String name = path.getName(); - //todo: what if this is a base? - if (!name.startsWith(DELTA_PREFIX + "_")) return false; - String idStr = name.substring(DELTA_PREFIX.length() + 1, DELTA_PREFIX.length() + 1 + DELTA_DIGITS_LEN); - try { - Long.parseLong(idStr);//what for? sanity check? - } catch (NumberFormatException ex) { - return false; - } - return true; - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b6967ed/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index ce683c8..b0ec5ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -112,7 +113,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable { if (!fs.exists(path)) return null; if (!isSourceMm) return matchFilesOneDir(fs, path, null); // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new JavaUtils.AnyIdDirFilter()); + FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter()); if (mmDirs == null || mmDirs.length == 0) return null; List<FileStatus> allFiles = new ArrayList<FileStatus>(); for (FileStatus mmDir : mmDirs) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b6967ed/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 757fc67..15e6c34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4384,47 +4384,6 @@ public class DDLTask extends Task<DDLWork> implements Serializable { + " to MM is not supported. Please re-create a table in the desired format."); } - private void handleRemoveMm( - Path path, ValidWriteIdList validWriteIdList, List<Path> result) throws HiveException { - // Note: doesn't take LB into account; that is not presently supported here (throws above). - try { - FileSystem fs = path.getFileSystem(conf); - for (FileStatus file : fs.listStatus(path)) { - Path childPath = file.getPath(); - if (!file.isDirectory()) { - ensureDelete(fs, childPath, "a non-directory file"); - continue; - } - Long writeId = JavaUtils.extractWriteId(childPath); - if (writeId == null) { - ensureDelete(fs, childPath, "an unknown directory"); - } else if (!validWriteIdList.isWriteIdValid(writeId)) { - // Assume no concurrent active writes - we rely on locks here. We could check and fail. - ensureDelete(fs, childPath, "an uncommitted directory"); - } else { - result.add(childPath); - } - } - } catch (IOException ex) { - throw new HiveException(ex); - } - } - - private static void ensureDelete(FileSystem fs, Path path, String what) throws IOException { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Deleting " + what + " " + path); - } - try { - if (!fs.delete(path, true)) { - throw new IOException("delete returned false"); - } - } catch (Exception ex) { - String error = "Couldn't delete " + path + "; cannot remove MM setting from the table"; - LOG.error(error, ex); - throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); - } - } - private List<Task<?>> generateAddMmTasks(Table tbl, Long writeId) throws HiveException { // We will move all the files in the table/partition directories into the first MM // directory, then commit the first write ID. http://git-wip-us.apache.org/repos/asf/hive/blob/0b6967ed/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6395c31..2503543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4076,7 +4076,7 @@ public final class Utilities { Boolean isBaseDir) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { - filter = new JavaUtils.IdPathFilter(writeId, stmtId); + filter = new AcidUtils.IdPathFilter(writeId, stmtId); } if (skipLevels == 0) { return statusToPath(fs.listStatus(path, filter)); @@ -4084,7 +4084,7 @@ public final class Utilities { // TODO: for some reason, globStatus doesn't work for masks like "...blah/*/delta_0000007_0000007*" // the last star throws it off. So, for now, if stmtId is missing use recursion. // For the same reason, we cannot use it if we don't know isBaseDir. Currently, we don't - // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. + // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. if (stmtId < 0 || isBaseDir == null || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); @@ -4183,7 +4183,7 @@ public final class Utilities { } private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, - int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, long writeId, int stmtId, + int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, Configuration conf) throws IOException { Path[] files = getMmDirectoryCandidates( fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, conf, null); @@ -4250,7 +4250,7 @@ public final class Utilities { FileSystem fs = specPath.getFileSystem(hconf); Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(writeId, stmtId); + AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, filter, writeId, stmtId, hconf); return; @@ -4275,7 +4275,7 @@ public final class Utilities { } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(writeId, stmtId); + AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); if (isMmCtas && !fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); @@ -4405,7 +4405,7 @@ public final class Utilities { for (int i = 0; i < children.length; ++i) { FileStatus file = children[i]; Path childPath = file.getPath(); - Long writeId = JavaUtils.extractWriteId(childPath); + Long writeId = AcidUtils.extractWriteId(childPath); if (!file.isDirectory() || writeId == null || !validWriteIdList.isWriteIdValid(writeId)) { Utilities.FILE_OP_LOGGER.debug("Skipping path {}", childPath); if (result == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b6967ed/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index fd84978..445e126 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,7 +18,21 @@ package org.apache.hadoop.hive.ql.io; -import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -54,6 +68,8 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; @@ -68,6 +84,7 @@ import java.util.regex.Pattern; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + /** * Utilities that are shared by all of the ACID input and output formats. They * are used by the compactor and cleaner and thus must be format agnostic. @@ -1823,4 +1840,60 @@ public class AcidUtils { } return false; } + + public static class AnyIdDirFilter implements PathFilter { + @Override + public boolean accept(Path path) { + return extractWriteId(path) != null; + } + } + + public static class IdPathFilter implements PathFilter { + private String baseDirName, deltaDirName; + private final boolean isDeltaPrefix; + + public IdPathFilter(long writeId, int stmtId) { + String deltaDirName = null; + deltaDirName = DELTA_PREFIX + String.format(DELTA_DIGITS, writeId) + "_" + + String.format(DELTA_DIGITS, writeId); + isDeltaPrefix = (stmtId < 0); + if (!isDeltaPrefix) { + deltaDirName += "_" + String.format(STATEMENT_DIGITS, stmtId); + } + + this.baseDirName = BASE_PREFIX + String.format(DELTA_DIGITS, writeId); + this.deltaDirName = deltaDirName; + } + + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.equals(baseDirName) || (isDeltaPrefix && name.startsWith(deltaDirName)) + || (!isDeltaPrefix && name.equals(deltaDirName)); + } + } + + + public static Long extractWriteId(Path file) { + String fileName = file.getName(); + if (!fileName.startsWith(DELTA_PREFIX) && !fileName.startsWith(BASE_PREFIX)) { + LOG.trace("Cannot extract write ID for a MM table: {}", file); + return null; + } + String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 + if (parts.length < 2) { + LOG.debug("Cannot extract write ID for a MM table: " + file + + " (" + Arrays.toString(parts) + ")"); + return null; + } + long writeId = -1; + try { + writeId = Long.parseLong(parts[1]); + } catch (NumberFormatException ex) { + LOG.debug("Cannot extract write ID for a MM table: " + file + + "; parsing " + parts[1] + " got " + ex.getMessage()); + return null; + } + return writeId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b6967ed/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 7987c4e..611a4c3 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -588,7 +588,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } if (!file.isDirectory()) { Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path); - } else if (JavaUtils.extractWriteId(path) == null) { + } else if (AcidUtils.extractWriteId(path) == null) { subdirs.add(path); } else if (!hadAcidState) { AcidUtils.Directory dirInfo