HIVE-11542 : port fileId support on shims and splits from llap branch (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3b6825b5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3b6825b5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3b6825b5 Branch: refs/heads/llap Commit: 3b6825b5b61e943e8e41743f5cbf6d640e0ebdf5 Parents: e059409 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Aug 17 15:16:57 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Aug 17 15:16:57 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 183 ++++++++++++++----- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 97 +++++++--- .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 25 ++- .../hive/ql/txn/compactor/CompactorMR.java | 13 +- .../hadoop/hive/ql/txn/compactor/Initiator.java | 9 +- .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 27 +-- .../hive/ql/io/orc/TestInputOutputFormat.java | 6 +- .../hadoop/hive/shims/Hadoop20SShims.java | 11 ++ .../apache/hadoop/hive/shims/Hadoop23Shims.java | 66 +++++++ .../apache/hadoop/hive/shims/HadoopShims.java | 15 ++ 11 files changed, 348 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9a6781b..da171b1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1024,6 +1024,8 @@ public class HiveConf extends Configuration { "data is read remotely (from the client or HS2 machine) and sent to all the tasks."), HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000, "Max cache size for keeping meta info about orc splits cached in the client."), + HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true, + "Include file ID in splits on file systems thaty support it."), HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10, "How many threads orc should use to create splits in parallel."), HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false, http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c7e0780..30db513 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -28,6 +28,9 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -221,7 +224,7 @@ public class AcidUtils { * Get the list of original files. * @return the list of original files (eg. 000000_0) */ - List<FileStatus> getOriginalFiles(); + List<HdfsFileStatusWithId> getOriginalFiles(); /** * Get the list of base and delta directories that are valid and not @@ -423,6 +426,20 @@ public class AcidUtils { return false; } + @VisibleForTesting + public static Directory getAcidState(Path directory, + Configuration conf, + ValidTxnList txnList + ) throws IOException { + return getAcidState(directory, conf, txnList, false); + } + + /** State class for getChildState; cannot modify 2 things in a method. */ + private static class TxnBase { + private FileStatus status; + private long txn; + } + /** * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't @@ -436,51 +453,40 @@ public class AcidUtils { */ public static Directory getAcidState(Path directory, Configuration conf, - ValidTxnList txnList + ValidTxnList txnList, + boolean useFileIds ) throws IOException { FileSystem fs = directory.getFileSystem(conf); - FileStatus bestBase = null; - long bestBaseTxn = 0; final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>(); List<ParsedDelta> working = new ArrayList<ParsedDelta>(); List<FileStatus> originalDirectories = new ArrayList<FileStatus>(); final List<FileStatus> obsolete = new ArrayList<FileStatus>(); - List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory, - hiddenFileFilter); - for(FileStatus child: children) { - Path p = child.getPath(); - String fn = p.getName(); - if (fn.startsWith(BASE_PREFIX) && child.isDir()) { - long txn = parseBase(p); - if (bestBase == null) { - bestBase = child; - bestBaseTxn = txn; - } else if (bestBaseTxn < txn) { - obsolete.add(bestBase); - bestBase = child; - bestBaseTxn = txn; - } else { - obsolete.add(child); - } - } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { - ParsedDelta delta = parseDelta(child); - if (txnList.isTxnRangeValid(delta.minTransaction, - delta.maxTransaction) != - ValidTxnList.RangeResponse.NONE) { - working.add(delta); - } - } else { - // This is just the directory. We need to recurse and find the actual files. But don't - // do this until we have determined there is no base. This saves time. Plus, - // it is possible that the cleaner is running and removing these original files, - // in which case recursing through them could cause us to get an error. - originalDirectories.add(child); + List<HdfsFileStatusWithId> childrenWithId = null; + if (useFileIds) { + try { + childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API", t); + useFileIds = false; + } + } + TxnBase bestBase = new TxnBase(); + final List<HdfsFileStatusWithId> original = new ArrayList<>(); + if (childrenWithId != null) { + for (HdfsFileStatusWithId child : childrenWithId) { + getChildState(child.getFileStatus(), child, txnList, working, + originalDirectories, original, obsolete, bestBase); + } + } else { + List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory, hiddenFileFilter); + for (FileStatus child : children) { + getChildState( + child, null, txnList, working, originalDirectories, original, obsolete, bestBase); } } - final List<FileStatus> original = new ArrayList<FileStatus>(); - // if we have a base, the original files are obsolete. - if (bestBase != null) { + // If we have a base, the original files are obsolete. + if (bestBase.status != null) { // remove the entries so we don't get confused later and think we should // use them. original.clear(); @@ -488,12 +494,12 @@ public class AcidUtils { // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. for (FileStatus origDir : originalDirectories) { - findOriginals(fs, origDir, original); + findOriginals(fs, origDir, original, useFileIds); } } Collections.sort(working); - long current = bestBaseTxn; + long current = bestBase.txn; int lastStmtId = -1; for(ParsedDelta next: working) { if (next.maxTransaction > current) { @@ -516,7 +522,7 @@ public class AcidUtils { } } - final Path base = bestBase == null ? null : bestBase.getPath(); + final Path base = bestBase.status == null ? null : bestBase.status.getPath(); LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); @@ -528,7 +534,7 @@ public class AcidUtils { } @Override - public List<FileStatus> getOriginalFiles() { + public List<HdfsFileStatusWithId> getOriginalFiles() { return original; } @@ -544,23 +550,100 @@ public class AcidUtils { }; } + private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, + ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories, + List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase) { + Path p = child.getPath(); + String fn = p.getName(); + if (fn.startsWith(BASE_PREFIX) && child.isDir()) { + long txn = parseBase(p); + if (bestBase.status == null) { + bestBase.status = child; + bestBase.txn = txn; + } else if (bestBase.txn < txn) { + obsolete.add(bestBase.status); + bestBase.status = child; + bestBase.txn = txn; + } else { + obsolete.add(child); + } + } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { + ParsedDelta delta = parseDelta(child); + if (txnList.isTxnRangeValid(delta.minTransaction, + delta.maxTransaction) != + ValidTxnList.RangeResponse.NONE) { + working.add(delta); + } + } else if (child.isDir()) { + // This is just the directory. We need to recurse and find the actual files. But don't + // do this until we have determined there is no base. This saves time. Plus, + // it is possible that the cleaner is running and removing these original files, + // in which case recursing through them could cause us to get an error. + originalDirectories.add(child); + } else { + original.add(createOriginalObj(childWithId, child)); + } + } + + public static HdfsFileStatusWithId createOriginalObj( + HdfsFileStatusWithId childWithId, FileStatus child) { + return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child); + } + + private static class HdfsFileStatusWithoutId implements HdfsFileStatusWithId { + private FileStatus fs; + + public HdfsFileStatusWithoutId(FileStatus fs) { + this.fs = fs; + } + + @Override + public FileStatus getFileStatus() { + return fs; + } + + @Override + public Long getFileId() { + return null; + } + } + /** - * Find the original files (non-ACID layout) recursively under the partition - * directory. + * Find the original files (non-ACID layout) recursively under the partition directory. * @param fs the file system - * @param stat the file/directory to add + * @param stat the directory to add * @param original the list of original files * @throws IOException */ private static void findOriginals(FileSystem fs, FileStatus stat, - List<FileStatus> original - ) throws IOException { - if (stat.isDir()) { - for(FileStatus child: SHIMS.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter)) { - findOriginals(fs, child, original); + List<HdfsFileStatusWithId> original, boolean useFileIds) throws IOException { + assert stat.isDir(); + List<HdfsFileStatusWithId> childrenWithId = null; + if (useFileIds) { + try { + childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter); + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API", t); + useFileIds = false; + } + } + if (childrenWithId != null) { + for (HdfsFileStatusWithId child : childrenWithId) { + if (child.getFileStatus().isDir()) { + findOriginals(fs, child.getFileStatus(), original, useFileIds); + } else { + original.add(child); + } } } else { - original.add(stat); + List<FileStatus> children = SHIMS.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); + for (FileStatus child : children) { + if (child.isDir()) { + findOriginals(fs, child, original, useFileIds); + } else { + original.add(createOriginalObj(null, child)); + } + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 6ed7872..fd6d2ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -48,12 +48,14 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidInputFormat.DeltaMetaData; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -63,6 +65,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -73,6 +76,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; @@ -436,26 +440,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, static final class SplitInfo extends ACIDSplitStrategy { private final Context context; private final FileSystem fs; - private final FileStatus file; + private final HdfsFileStatusWithId fileWithId; private final FileInfo fileInfo; private final boolean isOriginal; private final List<DeltaMetaData> deltas; private final boolean hasBase; SplitInfo(Context context, FileSystem fs, - FileStatus file, FileInfo fileInfo, + HdfsFileStatusWithId fileWithId, FileInfo fileInfo, boolean isOriginal, List<DeltaMetaData> deltas, boolean hasBase, Path dir, boolean[] covered) throws IOException { super(dir, context.numBuckets, deltas, covered); this.context = context; this.fs = fs; - this.file = file; + this.fileWithId = fileWithId; this.fileInfo = fileInfo; this.isOriginal = isOriginal; this.deltas = deltas; this.hasBase = hasBase; } + + @VisibleForTesting + public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, FileInfo fileInfo, + boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, Path dir, + boolean[] covered) throws IOException { + this(context, fs, AcidUtils.createOriginalObj(null, fileStatus), + fileInfo, isOriginal, deltas, hasBase, dir, covered); + } } /** @@ -465,14 +477,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> { Context context; FileSystem fs; - List<FileStatus> files; + List<HdfsFileStatusWithId> files; boolean isOriginal; List<DeltaMetaData> deltas; Path dir; boolean[] covered; - public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children, - boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) { + public ETLSplitStrategy(Context context, FileSystem fs, Path dir, + List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas, + boolean[] covered) { this.context = context; this.dir = dir; this.fs = fs; @@ -516,14 +529,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public List<SplitInfo> getSplits() throws IOException { List<SplitInfo> result = Lists.newArrayList(); - for (FileStatus file : files) { + for (HdfsFileStatusWithId file : files) { FileInfo info = null; if (context.cacheStripeDetails) { - info = verifyCachedFileInfo(file); + info = verifyCachedFileInfo(file.getFileStatus()); } // ignore files of 0 length - if (file.getLen() > 0) { - result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered)); + if (file.getFileStatus().getLen() > 0) { + result.add(new SplitInfo( + context, fs, file, info, isOriginal, deltas, true, dir, covered)); } } return result; @@ -540,7 +554,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, * as opposed to query execution (split generation does not read or cache file footers). */ static final class BISplitStrategy extends ACIDSplitStrategy { - List<FileStatus> fileStatuses; + List<HdfsFileStatusWithId> fileStatuses; boolean isOriginal; List<DeltaMetaData> deltas; FileSystem fs; @@ -548,7 +562,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Path dir; public BISplitStrategy(Context context, FileSystem fs, - Path dir, List<FileStatus> fileStatuses, boolean isOriginal, + Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) { super(dir, context.numBuckets, deltas, covered); this.context = context; @@ -562,11 +576,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public List<OrcSplit> getSplits() throws IOException { List<OrcSplit> splits = Lists.newArrayList(); - for (FileStatus fileStatus : fileStatuses) { + for (HdfsFileStatusWithId file : fileStatuses) { + FileStatus fileStatus = file.getFileStatus(); String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue() .getHosts(); - OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts, - null, isOriginal, true, deltas, -1); + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), file.getFileId(), 0, + fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1); splits.add(orcSplit); } @@ -606,7 +621,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (!deltas.isEmpty()) { for (int b = 0; b < numBuckets; ++b) { if (!covered[b]) { - splits.add(new OrcSplit(dir, b, 0, new String[0], null, false, false, deltas, -1)); + splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1)); } } } @@ -627,21 +642,23 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final Context context; private final FileSystem fs; private final Path dir; + private final boolean useFileIds; - FileGenerator(Context context, FileSystem fs, Path dir) { + FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds) { this.context = context; this.fs = fs; this.dir = dir; + this.useFileIds = useFileIds; } @Override public SplitStrategy call() throws IOException { final SplitStrategy splitStrategy; AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, - context.conf, context.transactionList); + context.conf, context.transactionList, useFileIds); List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); Path base = dirInfo.getBaseDirectory(); - List<FileStatus> original = dirInfo.getOriginalFiles(); + List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles(); boolean[] covered = new boolean[context.numBuckets]; boolean isOriginal = base == null; @@ -649,17 +666,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (base != null || !original.isEmpty()) { // find the base files (original or new style) - List<FileStatus> children = original; + List<HdfsFileStatusWithId> children = original; if (base != null) { - children = SHIMS.listLocatedStatus(fs, base, - AcidUtils.hiddenFileFilter); + children = findBaseFiles(base, useFileIds); } long totalFileSize = 0; - for (FileStatus child : children) { - totalFileSize += child.getLen(); + for (HdfsFileStatusWithId child : children) { + totalFileSize += child.getFileStatus().getLen(); AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename - (child.getPath(), context.conf); + (child.getFileStatus().getPath(), context.conf); int b = opts.getBucket(); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. @@ -700,6 +716,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, return splitStrategy; } + + private List<HdfsFileStatusWithId> findBaseFiles( + Path base, boolean useFileIds) throws IOException { + if (useFileIds) { + try { + return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter); + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API", t); + } + } + // Fall back to regular API and create states without ID. + List<FileStatus> children = SHIMS.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter); + List<HdfsFileStatusWithId> result = new ArrayList<>(children.size()); + for (FileStatus child : children) { + result.add(AcidUtils.createOriginalObj(null, child)); + } + return result; + } } /** @@ -709,6 +743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, static final class SplitGenerator implements Callable<List<OrcSplit>> { private final Context context; private final FileSystem fs; + private final HdfsFileStatusWithId fileWithId; private final FileStatus file; private final long blockSize; private final TreeMap<Long, BlockLocation> locations; @@ -728,8 +763,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, public SplitGenerator(SplitInfo splitInfo) throws IOException { this.context = splitInfo.context; this.fs = splitInfo.fs; - this.file = splitInfo.file; - this.blockSize = file.getBlockSize(); + this.fileWithId = splitInfo.fileWithId; + this.file = this.fileWithId.getFileStatus(); + this.blockSize = this.file.getBlockSize(); this.fileInfo = splitInfo.fileInfo; locations = SHIMS.getLocationsWithOffset(fs, file); this.isOriginal = splitInfo.isOriginal; @@ -837,8 +873,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, final double splitRatio = (double) length / (double) fileLen; final long scaledProjSize = projColsUncompressedSize > 0 ? (long) (splitRatio * projColsUncompressedSize) : fileLen; - return new OrcSplit(file.getPath(), offset, length, hosts, fileMetaInfo, - isOriginal, hasBase, deltas, scaledProjSize); + return new OrcSplit(file.getPath(), fileWithId.getFileId(), offset, length, hosts, + fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize); } /** @@ -1020,9 +1056,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, List<Future<?>> splitFutures = Lists.newArrayList(); // multi-threaded file statuses and split strategy + boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS); for (Path dir : getInputPaths(conf)) { FileSystem fs = dir.getFileSystem(conf); - FileGenerator fileGenerator = new FileGenerator(context, fs, dir); + FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds); pathFutures.add(context.threadPool.submit(fileGenerator)); } http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 8cf4cc0..cc03df7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -25,6 +25,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -39,6 +41,8 @@ import org.apache.hadoop.mapred.FileSplit; * */ public class OrcSplit extends FileSplit { + private static final Log LOG = LogFactory.getLog(OrcSplit.class); + private ReaderImpl.FileMetaInfo fileMetaInfo; private boolean hasFooter; private boolean isOriginal; @@ -46,7 +50,9 @@ public class OrcSplit extends FileSplit { private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; + private transient Long fileId; + static final int HAS_FILEID_FLAG = 8; static final int BASE_FLAG = 4; static final int ORIGINAL_FLAG = 2; static final int FOOTER_FLAG = 1; @@ -58,10 +64,13 @@ public class OrcSplit extends FileSplit { super(null, 0, 0, (String[]) null); } - public OrcSplit(Path path, long offset, long length, String[] hosts, + public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts, ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase, List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) { super(path, offset, length, hosts); + // We could avoid serializing file ID and just replace the path with inode-based path. + // However, that breaks bunch of stuff because Hive later looks up things by split path. + this.fileId = fileId; this.fileMetaInfo = fileMetaInfo; hasFooter = this.fileMetaInfo != null; this.isOriginal = isOriginal; @@ -77,7 +86,8 @@ public class OrcSplit extends FileSplit { int flags = (hasBase ? BASE_FLAG : 0) | (isOriginal ? ORIGINAL_FLAG : 0) | - (hasFooter ? FOOTER_FLAG : 0); + (hasFooter ? FOOTER_FLAG : 0) | + (fileId != null ? HAS_FILEID_FLAG : 0); out.writeByte(flags); out.writeInt(deltas.size()); for(AcidInputFormat.DeltaMetaData delta: deltas) { @@ -99,6 +109,9 @@ public class OrcSplit extends FileSplit { footerBuff.limit() - footerBuff.position()); WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId()); } + if (fileId != null) { + out.writeLong(fileId.longValue()); + } } @Override @@ -110,6 +123,7 @@ public class OrcSplit extends FileSplit { hasFooter = (FOOTER_FLAG & flags) != 0; isOriginal = (ORIGINAL_FLAG & flags) != 0; hasBase = (BASE_FLAG & flags) != 0; + boolean hasFileId = (HAS_FILEID_FLAG & flags) != 0; deltas.clear(); int numDeltas = in.readInt(); @@ -134,6 +148,9 @@ public class OrcSplit extends FileSplit { fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff, writerVersion); } + if (hasFileId) { + fileId = in.readLong(); + } } ReaderImpl.FileMetaInfo getFileMetaInfo(){ @@ -159,4 +176,8 @@ public class OrcSplit extends FileSplit { public long getProjectedColumnsUncompressedSize() { return projColsUncompressedSize; } + + public Long getFileId() { + return fileId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 8e431b2..02fa725 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -133,7 +134,8 @@ public class CompactorMR { // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState( + new Path(sd.getLocation()), conf, txns, false); StringableList dirsToSearch = new StringableList(); Path baseDir = null; if (isMajor) { @@ -141,12 +143,13 @@ public class CompactorMR { // partition is just now being converted to ACID. baseDir = dir.getBaseDirectory(); if (baseDir == null) { - List<FileStatus> originalFiles = dir.getOriginalFiles(); + List<HdfsFileStatusWithId> originalFiles = dir.getOriginalFiles(); if (!(originalFiles == null) && !(originalFiles.size() == 0)) { // There are original format files - for (FileStatus stat : originalFiles) { - dirsToSearch.add(stat.getPath()); - LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search"); + for (HdfsFileStatusWithId stat : originalFiles) { + Path path = stat.getFileStatus().getPath(); + dirsToSearch.add(path); + LOG.debug("Adding original file " + path + " to dirs to search"); } // Set base to the location so that the input format reads the original files. baseDir = new Path(sd.getLocation()); http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 73715c6..9bf725d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -223,7 +224,7 @@ public class Initiator extends CompactorThread { boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); - AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns, false); Path base = dir.getBaseDirectory(); long baseSize = 0; FileStatus stat = null; @@ -236,9 +237,9 @@ public class Initiator extends CompactorThread { baseSize = sumDirSize(fs, base); } - List<FileStatus> originals = dir.getOriginalFiles(); - for (FileStatus origStat : originals) { - baseSize += origStat.getLen(); + List<HdfsFileStatusWithId> originals = dir.getOriginalFiles(); + for (HdfsFileStatusWithId origStat : originals) { + baseSize += origStat.getFileStatus().getLen(); } long deltaSize = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index f8ded12..b6ba862 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.junit.Test; import java.util.List; @@ -102,13 +103,14 @@ public class TestAcidUtils { assertEquals(null, dir.getBaseDirectory()); assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); - List<FileStatus> result = dir.getOriginalFiles(); + List<HdfsFileStatusWithId> result = dir.getOriginalFiles(); assertEquals(5, result.size()); - assertEquals("mock:/tbl/part1/000000_0", result.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/000001_1", result.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/000002_0", result.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/random", result.get(3).getPath().toString()); - assertEquals("mock:/tbl/part1/subdir/000000_0", result.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/subdir/000000_0", + result.get(4).getFileStatus().getPath().toString()); } @Test @@ -136,13 +138,14 @@ public class TestAcidUtils { obsolete.get(0).getPath().toString()); assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(1).getPath().toString()); - List<FileStatus> result = dir.getOriginalFiles(); + List<HdfsFileStatusWithId> result = dir.getOriginalFiles(); assertEquals(5, result.size()); - assertEquals("mock:/tbl/part1/000000_0", result.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/000001_1", result.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/000002_0", result.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/random", result.get(3).getPath().toString()); - assertEquals("mock:/tbl/part1/subdir/000000_0", result.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString()); + assertEquals("mock:/tbl/part1/subdir/000000_0", + result.get(4).getFileStatus().getPath().toString()); List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories(); assertEquals(2, deltas.size()); AcidUtils.ParsedDelta delt = deltas.get(0); http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 0c12c89..547e799 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -483,7 +483,7 @@ public class TestInputOutputFormat { final OrcInputFormat.Context context = new OrcInputFormat.Context( conf, n); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator( - context, fs, new MockPath(fs, "mock:/a/b")); + context, fs, new MockPath(fs, "mock:/a/b"), false); final SplitStrategy splitStrategy = gen.call(); assertTrue( String.format( @@ -507,7 +507,7 @@ public class TestInputOutputFormat { new MockFile("mock:/a/b/part-04", 1000, new byte[0])); OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b")); + new MockPath(fs, "mock:/a/b"), false); SplitStrategy splitStrategy = gen.call(); assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy); @@ -520,7 +520,7 @@ public class TestInputOutputFormat { new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]), new MockFile("mock:/a/b/part-04", 1000, new byte[1000])); gen = new OrcInputFormat.FileGenerator(context, fs, - new MockPath(fs, "mock:/a/b")); + new MockPath(fs, "mock:/a/b"), false); splitStrategy = gen.call(); assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy); http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java ---------------------------------------------------------------------- diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index ffffcb7..a56309f 100644 --- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -722,4 +722,15 @@ public class Hadoop20SShims extends HadoopShimsSecure { Token<?> fsToken = fs.getDelegationToken(uname); cred.addToken(fsToken.getService(), fsToken); } + + @Override + public List<HdfsFileStatusWithId> listLocatedHdfsStatus( + FileSystem fs, Path path, PathFilter filter) throws IOException { + throw new UnsupportedOperationException("Not supported on old version"); + } + + @Override + public long getFileId(FileSystem fs, String path) throws IOException { + throw new UnsupportedOperationException("Not supported on old version"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9eae0ac..e5be8d6 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -61,10 +61,13 @@ import org.apache.hadoop.fs.permission.AclEntryType; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.io.LongWritable; @@ -662,6 +665,64 @@ public class Hadoop23Shims extends HadoopShimsSecure { return result; } + private static final class HdfsFileStatusWithIdImpl implements HdfsFileStatusWithId { + private final LocatedFileStatus lfs; + private final long fileId; + + public HdfsFileStatusWithIdImpl(LocatedFileStatus lfs, long fileId) { + this.lfs = lfs; + this.fileId = fileId; + } + + @Override + public FileStatus getFileStatus() { + return lfs; + } + + @Override + public Long getFileId() { + return fileId; + } + } + + @Override + public List<HdfsFileStatusWithId> listLocatedHdfsStatus( + FileSystem fs, Path p, PathFilter filter) throws IOException { + DistributedFileSystem dfs = ensureDfs(fs); + DFSClient dfsc = dfs.getClient(); + final String src = p.toUri().getPath(); + DirectoryListing current = dfsc.listPaths(src, + org.apache.hadoop.hdfs.protocol.HdfsFileStatus.EMPTY_NAME, true); + if (current == null) { // the directory does not exist + throw new FileNotFoundException("File " + p + " does not exist."); + } + final URI fsUri = fs.getUri(); + List<HdfsFileStatusWithId> result = new ArrayList<HdfsFileStatusWithId>( + current.getPartialListing().length); + while (current != null) { + org.apache.hadoop.hdfs.protocol.HdfsFileStatus[] hfss = current.getPartialListing(); + for (int i = 0; i < hfss.length; ++i) { + HdfsLocatedFileStatus next = (HdfsLocatedFileStatus)(hfss[i]); + if (filter != null) { + Path filterPath = next.getFullPath(p).makeQualified(fsUri, null); + if (!filter.accept(filterPath)) continue; + } + LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p); + result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId())); + } + current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null; + } + return result; + } + + private DistributedFileSystem ensureDfs(FileSystem fs) { + if (!(fs instanceof DistributedFileSystem)) { + throw new UnsupportedOperationException("Only supported for DFS; got " + fs.getClass()); + } + DistributedFileSystem dfs = (DistributedFileSystem)fs; + return dfs; + } + @Override public BlockLocation[] getLocations(FileSystem fs, FileStatus status) throws IOException { @@ -1352,4 +1413,9 @@ public class Hadoop23Shims extends HadoopShimsSecure { // Use method addDelegationTokens instead of getDelegationToken to get all the tokens including KMS. fs.addDelegationTokens(uname, cred); } + + @Override + public long getFileId(FileSystem fs, String path) throws IOException { + return ensureDfs(fs).getClient().getFileInfo(path).getFileId(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 74785e5..2b6f322 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -256,6 +256,10 @@ public interface HadoopShims { List<FileStatus> listLocatedStatus(FileSystem fs, Path path, PathFilter filter) throws IOException; + + List<HdfsFileStatusWithId> listLocatedHdfsStatus( + FileSystem fs, Path path, PathFilter filter) throws IOException; + /** * For file status returned by listLocatedStatus, convert them into a list * of block locations. @@ -316,6 +320,11 @@ public interface HadoopShims { public void debugLog(); } + public interface HdfsFileStatusWithId { + public FileStatus getFileStatus(); + public Long getFileId(); + } + public HCatHadoopShims getHCatShim(); public interface HCatHadoopShims { @@ -731,4 +740,10 @@ public interface HadoopShims { * @throws IOException If an error occurred on adding the token. */ public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException; + + /** + * Gets file ID. Only supported on hadoop-2. + * @return inode ID of the file. + */ + long getFileId(FileSystem fs, String path) throws IOException; }