http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 95a60dc..73f27e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -88,11 +88,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ */ private int statementId;//sort on this descending, like currentTransactionId - public ReaderKey() { + ReaderKey() { this(-1, -1, -1, -1, 0); } - public ReaderKey(long originalTransaction, int bucket, long rowId, + ReaderKey(long originalTransaction, int bucket, long rowId, long currentTransactionId) { this(originalTransaction, bucket, rowId, currentTransactionId, 0); } @@ -196,6 +196,34 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ void next(OrcStruct next) throws IOException; } /** + * Used when base_x/bucket_N is missing - makes control flow a bit easier + */ + private class EmptyReaderPair implements ReaderPair { + @Override public OrcStruct nextRecord() { + return null; + } + @Override public int getColumns() { + return 0; + } + @Override public RecordReader getRecordReader() { + return null; + } + @Override public Reader getReader() { + return null; + } + @Override public RecordIdentifier getMinKey() { + return null; + } + @Override public RecordIdentifier getMaxKey() { + return null; + } + @Override public ReaderKey getKey() { + return null; + } + @Override public void next(OrcStruct next) throws IOException { + } + } + /** * A reader and the next record from that reader. The code reads ahead so that * we can return the lowest ReaderKey from each of the readers. Thus, the * next available row is nextRecord and only following records are still in @@ -209,6 +237,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final ReaderKey key; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; + @Deprecated//HIVE-18158 private final int statementId; /** @@ -320,12 +349,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final ReaderKey key; final int bucketId; final int bucketProperty; + /** + * TransactionId to use when generating synthetic ROW_IDs + */ + final long transactionId; - OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException { + OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions, + int statementId) throws IOException { this.key = key; this.bucketId = bucketId; assert bucketId >= 0 : "don't support non-bucketed tables yet"; - this.bucketProperty = encodeBucketId(conf, bucketId); + this.bucketProperty = encodeBucketId(conf, bucketId, statementId); + transactionId = mergeOptions.getTransactionId(); } @Override public final OrcStruct nextRecord() { return nextRecord; @@ -337,7 +372,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ @Override public final ReaderKey getKey() { return key; } /** - * The cumulative number of row in all files of the logical bucket that precede the file + * The cumulative number of rows in all files of the logical bucket that precede the file * represented by {@link #getRecordReader()} */ abstract long getRowIdOffset(); @@ -355,9 +390,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ new IntWritable(OrcRecordUpdater.INSERT_OPERATION); nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation); nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, - new LongWritable(0)); + new LongWritable(transactionId)); nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, - new LongWritable(0)); + new LongWritable(transactionId)); nextRecord().setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucketProperty)); nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID, @@ -369,17 +404,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) - .set(0); + .set(transactionId); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucketProperty); ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) - .set(0); + .set(transactionId); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); nextRecord().setFieldValue(OrcRecordUpdater.ROW, getRecordReader().next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucketProperty, nextRowId, 0L, 0); + key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0); if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + getMaxKey()); @@ -391,9 +426,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ return false;//reached EndOfFile } } - static int encodeBucketId(Configuration conf, int bucketId) { - return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId)); + static int encodeBucketId(Configuration conf, int bucketId, int statementId) { + return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId) + .statementId(statementId)); } + /** + * This handles normal read (as opposed to Compaction) of a {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} + * file. These may be a result of Load Data or it may be a file that was written to the table + * before it was converted to acid. + */ @VisibleForTesting final static class OriginalReaderPairToRead extends OriginalReaderPair { private final long rowIdOffset; @@ -401,12 +442,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final RecordReader recordReader; private final RecordIdentifier minKey; private final RecordIdentifier maxKey; - OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId, final RecordIdentifier minKey, final RecordIdentifier maxKey, Reader.Options options, Options mergerOptions, Configuration conf, - ValidTxnList validTxnList) throws IOException { - super(key, bucketId, conf); + ValidTxnList validTxnList, int statementId) throws IOException { + super(key, bucketId, conf, mergerOptions, statementId); this.reader = reader; assert !mergerOptions.isCompacting(); assert mergerOptions.getRootPath() != null : "Since we have original files"; @@ -426,6 +466,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ boolean haveSeenCurrentFile = false; long rowIdOffsetTmp = 0; { + /** + * Note that for reading base_x/ or delta_x_x/ with non-acid schema, + * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's + * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()} + */ //the split is from something other than the 1st file of the logical bucket - compute offset AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validTxnList, false, true); @@ -458,7 +503,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ if (rowIdOffset > 0) { //rowIdOffset could be 0 if all files before current one are empty /** - * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)} + * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration, Options)} * need to fix min/max key since these are used by * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for * the key. Clear? */ @@ -469,7 +514,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * If this is not the 1st file, set minKey 1 less than the start of current file * (Would not need to set minKey if we knew that there are no delta files) * {@link #advanceToMinKey()} needs this */ - newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1); + newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1); } if (maxKey != null) { maxKey.setRowId(maxKey.getRowId() + rowIdOffset); @@ -482,7 +527,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * of the file so we want to leave it blank to make sure any insert events in delta * files are included; Conversely, if it's not the last file, set the maxKey so that * events from deltas that don't modify anything in the current split are excluded*/ - newMaxKey = new RecordIdentifier(0, bucketProperty, + newMaxKey = new RecordIdentifier(transactionId, bucketProperty, rowIdOffset + reader.getNumberOfRows() - 1); } this.minKey = newMinKey; @@ -532,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ OriginalReaderPairToCompact(ReaderKey key, int bucketId, Reader.Options options, Options mergerOptions, Configuration conf, - ValidTxnList validTxnList) throws IOException { - super(key, bucketId, conf); + ValidTxnList validTxnList, int statementId) throws IOException { + super(key, bucketId, conf, mergerOptions, statementId); assert mergerOptions.isCompacting() : "Should only be used for Compaction"; this.conf = conf; this.options = options; @@ -544,9 +589,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ assert options.getMaxOffset() == Long.MAX_VALUE; AcidUtils.Directory directoryState = AcidUtils.getAcidState( mergerOptions.getRootPath(), conf, validTxnList, false, true); + /** + * Note that for reading base_x/ or delta_x_x/ with non-acid schema, + * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's + * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()} + */ originalFiles = directoryState.getOriginalFiles(); assert originalFiles.size() > 0; - this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket + //in case of Compaction, this is the 1st file of the current bucket + this.reader = advanceToNextFile(); if (reader == null) { //Compactor generated a split for a bucket that has no data? throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId + @@ -655,7 +706,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ */ private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket, Reader.Options options, - Configuration conf) throws IOException { + Configuration conf, Options mergerOptions) throws IOException { long rowLength = 0; long rowOffset = 0; long offset = options.getOffset();//this would usually be at block boundary @@ -663,7 +714,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ boolean isTail = true; RecordIdentifier minKey = null; RecordIdentifier maxKey = null; - int bucketProperty = encodeBucketId(conf, bucket); + TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs( + mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf); + int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId); /** * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't * necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st @@ -755,13 +808,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts. * This makes the "context" explicit. */ - static class Options { + static class Options implements Cloneable { private int copyIndex = 0; private boolean isCompacting = false; private Path bucketPath; private Path rootPath; + private Path baseDir; private boolean isMajorCompaction = false; private boolean isDeleteReader = false; + private long transactionId = 0; Options copyIndex(int copyIndex) { assert copyIndex >= 0; this.copyIndex = copyIndex; @@ -790,6 +845,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ assert !isCompacting; return this; } + Options transactionId(long transactionId) { + this.transactionId = transactionId; + return this; + } + Options baseDir(Path baseDir) { + this.baseDir = baseDir; + return this; + } /** * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix */ @@ -825,13 +888,48 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ boolean isDeleteReader() { return isDeleteReader; } + /** + * for reading "original" files - i.e. not native acid schema. Default value of 0 is + * appropriate for files that existed in a table before it was made transactional. 0 is the + * primordial transaction. For non-native files resulting from Load Data command, they + * are located and base_x or delta_x_x and then transactionId == x. + */ + long getTransactionId() { + return transactionId; + } + + /** + * In case of isMajorCompaction() this is the base dir from the Compactor, i.e. either a base_x + * or {@link #rootPath} if it's the 1st major compaction after non-acid2acid conversion + */ + Path getBaseDir() { + return baseDir; + } + /** + * shallow clone + */ + public Options clone() { + try { + return (Options) super.clone(); + } + catch(CloneNotSupportedException ex) { + throw new AssertionError(); + } + } } /** - * Create a reader that merge sorts the ACID events together. + * Create a reader that merge sorts the ACID events together. This handles + * 1. 'normal' reads on behalf of a query (non vectorized) + * 2. Compaction reads (major/minor) + * 3. Delete event reads - to create a sorted view of all delete events for vectorized read + * + * This makes the logic in the constructor confusing and needs to be refactored. Liberal use of + * asserts below is primarily for documentation purposes. + * * @param conf the configuration * @param collapseEvents should the events on the same row be collapsed - * @param isOriginal is the base file a pre-acid file - * @param bucket the bucket we are reading + * @param isOriginal if reading filws w/o acid schema - {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} + * @param bucket the bucket/writer id of the file we are reading * @param options the options to read with * @param deltaDirectory the list of delta directories to include * @throws IOException @@ -887,11 +985,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); + assert !(mergerOptions.isCompacting() && reader != null) : "don't need a reader for compaction"; // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); + //suppose it's the first Major compaction so we only have deltas + boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction() + && mergerOptions.getBaseDir() == null; if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) || - mergerOptions.isDeleteReader()) { + mergerOptions.isDeleteReader() || isMajorNoBase) { //for minor compaction, there is no progress report and we don't filter deltas baseReader = null; minKey = maxKey = null; @@ -906,27 +1008,68 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } else { // find the min/max based on the offset and length (and more for 'original') if (isOriginal) { - keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf); + //note that this KeyInterval may be adjusted later due to copy_N files + keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf, mergerOptions); } else { keyInterval = discoverKeyBounds(reader, options); } } LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey()); // use the min/max instead of the byte range - ReaderPair pair; + ReaderPair pair = null; ReaderKey key = new ReaderKey(); if (isOriginal) { options = options.clone(); if(mergerOptions.isCompacting()) { - pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions, - conf, validTxnList); + assert mergerOptions.isMajorCompaction(); + Options readerPairOptions = mergerOptions; + if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) { + readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, + AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir()); + } + pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions, + conf, validTxnList, + 0);//0 since base_x doesn't have a suffix (neither does pre acid write) } else { + assert mergerOptions.getBucketPath() != null : " since this is not compaction: " + + mergerOptions.getRootPath(); + //if here it's a non-acid schema file - check if from before table was marked transactional + //or in base_x/delta_x_x from Load Data + Options readerPairOptions = mergerOptions; + TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs( + mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf); + if(tfp.syntheticTransactionId > 0) { + readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, + tfp.syntheticTransactionId, tfp.folder); + } pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), - keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList); + keyInterval.getMaxKey(), options, readerPairOptions, conf, validTxnList, tfp.statementId); } } else { - pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), - eventOptions, 0); + if(mergerOptions.isCompacting()) { + assert mergerOptions.isMajorCompaction() : "expected major compaction: " + + mergerOptions.getBaseDir() + ":" + bucket; + assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath(); + //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty + FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf); + Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket); + if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) { + //doing major compaction - it's possible where full compliment of bucket files is not + //required (on Tez) that base_x/ doesn't have a file for 'bucket' + reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf)); + pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + eventOptions, 0); + } + else { + pair = new EmptyReaderPair(); + LOG.info("No non-empty " + bucketPath + " was found for Major compaction"); + } + } + else { + assert reader != null : "no reader? " + mergerOptions.getRootPath(); + pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(), + eventOptions, 0); + } } minKey = pair.getMinKey(); maxKey = pair.getMaxKey(); @@ -937,11 +1080,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } baseReader = pair.getRecordReader(); } - - if (deltaDirectory != null) { - /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no - * user columns - * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/ + /*now process the delta files. For normal read these should only be delete deltas. For + * Compaction these may be any delta_x_y/. The files inside any delta_x_y/ may be in Acid + * format (i.e. with Acid metadata columns) or 'original'.*/ + if (deltaDirectory != null && deltaDirectory.length > 0) { + /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no + * user columns. For Compaction there is never a SARG. + * */ Reader.Options deltaEventOptions = eventOptions.clone() .searchArgument(null, null).range(0, Long.MAX_VALUE); for(Path delta: deltaDirectory) { @@ -950,17 +1095,50 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ throw new IllegalStateException(delta + " is not delete delta and is not compacting."); } ReaderKey key = new ReaderKey(); - AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); + AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf)); + if(deltaDir.isRawFormat()) { + assert !deltaDir.isDeleteDelta() : delta.toString(); + assert mergerOptions.isCompacting() : "during regular read anything which is not a" + + " delete_delta is treated like base: " + delta; + Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions, + deltaDir.getMinTransaction(), delta); + //this will also handle copy_N files if any + ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options, + rawCompactOptions, conf, validTxnList, deltaDir.getStatementId()); + if (deltaPair.nextRecord() != null) { + readers.put(key, deltaPair); + } + continue; + } for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) { FileSystem fs = deltaFile.getFileSystem(conf); if(!fs.exists(deltaFile)) { + /** + * it's possible that the file for a specific {@link bucket} doesn't exist in any given + * delta since since no rows hashed to it (and not configured to create empty buckets) + */ continue; } + if(deltaDir.isDeleteDelta()) { + //if here it maybe compaction or regular read or Delete event sorter + //in the later 2 cases we should do: + //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta + Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf)); + ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, + deltaEventOptions, deltaDir.getStatementId()); + if (deltaPair.nextRecord() != null) { + readers.put(key, deltaPair); + } + continue; + } + //if here then we must be compacting + assert mergerOptions.isCompacting() : "not compacting and not delete delta : " + delta; /* side files are only created by streaming ingest. If this is a compaction, we may * have an insert delta/ here with side files there because the original writer died.*/ long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile)); assert length >= 0; Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); + //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey, deltaEventOptions, deltaDir.getStatementId()); if (deltaPair.nextRecord() != null) { @@ -988,6 +1166,76 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } /** + * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} + * type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read + * time and made permanent at compaction time. This is identical to how 'original' files (i.e. + * those that existed in the table before it was converted to an Acid table) except that the + * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data. + */ + static final class TransactionMetaData { + final long syntheticTransactionId; + /** + * folder which determines the transaction id to use in synthetic ROW_IDs + */ + final Path folder; + final int statementId; + TransactionMetaData(long syntheticTransactionId, Path folder) { + this(syntheticTransactionId, folder, 0); + } + TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) { + this.syntheticTransactionId = syntheticTransactionId; + this.folder = folder; + this.statementId = statementId; + } + static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath, + Configuration conf) throws IOException { + Path parent = splitPath.getParent(); + if(rootPath.equals(parent)) { + //the 'isOriginal' file is at the root of the partition (or table) thus it is + //from a pre-acid conversion write and belongs to primordial txnid:0. + return new TransactionMetaData(0, parent); + } + while(parent != null && !rootPath.equals(parent)) { + boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX); + boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX); + if(isBase || isDelta) { + if(isBase) { + return new TransactionMetaData(AcidUtils.parseBase(parent), parent); + } + else { + AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, + parent.getFileSystem(conf)); + assert pd.getMinTransaction() == pd.getMaxTransaction() : + "This a delta with raw non acid schema, must be result of single write, no compaction: " + + splitPath; + return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId()); + } + } + parent = parent.getParent(); + } + if(parent == null) { + //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a + //base/ or delta/ - this should never happen + throw new IllegalStateException("Cannot determine transaction id for original file " + + splitPath + " in " + rootPath); + } + //"warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid + // converted table + return new TransactionMetaData(0, rootPath); + } + } + /** + * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which + * happens as a result of Load Data statement. Setting {@code rootPath} to base_x/ or delta_x_x + * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent + * {@link OriginalReaderPair} object to return the files in this dir + * in {@link AcidUtils.Directory#getOriginalFiles()} + * @return modified clone of {@code baseOptions} + */ + private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) { + return baseOptions.clone().transactionId(transactionId).rootPath(rootPath); + } + /** * This determines the set of {@link ReaderPairAcid} to create for a given delta/. * For unbucketed tables {@code bucket} can be thought of as a write tranche. */
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 315cc1d..8af38b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -196,7 +196,9 @@ public class OrcRecordUpdater implements RecordUpdater { fields.add(new OrcStruct.Field("row", rowInspector, ROW)); return new OrcStruct.OrcStructInspector(fields); } - + /** + * @param path - partition root + */ OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 58638b5..edffa5b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -51,6 +51,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class); private OrcTail orcTail; private boolean hasFooter; + /** + * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} + */ private boolean isOriginal; private boolean hasBase; //partition root http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index bcde4fc..d571bd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.impl.AcidStats; @@ -156,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader this.vectorizedRowBatchBase = baseReader.createValue(); } - private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter, + private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter, VectorizedRowBatchCtx rowBatchCtx) throws IOException { this.rbCtx = rowBatchCtx; final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); @@ -165,12 +164,10 @@ public class VectorizedOrcAcidRowBatchReader // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is // enabled for an ACID case and the file format is ORC. - boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate() - || !(inputSplit instanceof OrcSplit); + boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate(); if (isReadNotAllowed) { OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); } - final OrcSplit orcSplit = (OrcSplit) inputSplit; reporter.setStatus(orcSplit.toString()); readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf)); @@ -226,9 +223,11 @@ public class VectorizedOrcAcidRowBatchReader private static final class OffsetAndBucketProperty { private final long rowIdOffset; private final int bucketProperty; - private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) { + private final long syntheticTxnId; + private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) { this.rowIdOffset = rowIdOffset; this.bucketProperty = bucketProperty; + this.syntheticTxnId = syntheticTxnId; } } /** @@ -240,17 +239,34 @@ public class VectorizedOrcAcidRowBatchReader * * todo: This logic is executed per split of every "original" file. The computed result is the * same for every split form the same file so this could be optimized by moving it to - * before/during splt computation and passing the info in the split. (HIVE-17917) + * before/during split computation and passing the info in the split. (HIVE-17917) */ private OffsetAndBucketProperty computeOffsetAndBucket( OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException { - if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) { - return new OffsetAndBucketProperty(0,0); + if(!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) { + if(split.isOriginal()) { + /** + * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that + * created the file to see if it's committed. See more in + * {@link #next(NullWritable, VectorizedRowBatch)}. (In practice getAcidState() should + * filter out base/delta files but this makes fewer dependencies) + */ + OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo = + OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(), + split.getRootDir(), conf); + return new OffsetAndBucketProperty(-1,-1, + syntheticTxnInfo.syntheticTransactionId); + } + return null; } long rowIdOffset = 0; + OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo = + OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(), + split.getRootDir(), conf); int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); - int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId)); - AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf, + int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) + .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); + AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf, validTxnList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { AcidOutputFormat.Options bucketOptions = @@ -266,7 +282,8 @@ public class VectorizedOrcAcidRowBatchReader OrcFile.readerOptions(conf)); rowIdOffset += reader.getNumberOfRows(); } - return new OffsetAndBucketProperty(rowIdOffset, bucketProperty); + return new OffsetAndBucketProperty(rowIdOffset, bucketProperty, + syntheticTxnInfo.syntheticTransactionId); } /** * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables. @@ -284,7 +301,7 @@ public class VectorizedOrcAcidRowBatchReader if(rbCtx == null) { throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath()); } - return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx)); + return !needSyntheticRowIds(split.isOriginal(), hasDeletes, areRowIdsProjected(rbCtx)); } /** @@ -292,8 +309,8 @@ public class VectorizedOrcAcidRowBatchReader * Even if ROW__ID is not projected you still need to decorate the rows with them to see if * any of the delete events apply. */ - private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) { - return split.isOriginal() && (hasDeletes || rowIdProjected); + private static boolean needSyntheticRowIds(boolean isOriginal, boolean hasDeletes, boolean rowIdProjected) { + return isOriginal && (hasDeletes || rowIdProjected); } private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { if(rbCtx.getVirtualColumnCount() == 0) { @@ -316,7 +333,7 @@ public class VectorizedOrcAcidRowBatchReader if (orcSplit.isOriginal()) { root = orcSplit.getRootDir(); } else { - root = path.getParent().getParent(); + root = path.getParent().getParent();//todo: why not just use getRootDir()? assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() + " path.p.p=" + root; } @@ -398,7 +415,9 @@ public class VectorizedOrcAcidRowBatchReader * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order * to see if any deletes apply */ - if(rowIdProjected || !deleteEventRegistry.isEmpty()) { + if(needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected)) { + assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps; + assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps; if(innerReader == null) { throw new IllegalStateException(getClass().getName() + " requires " + org.apache.orc.RecordReader.class + @@ -409,8 +428,7 @@ public class VectorizedOrcAcidRowBatchReader */ recordIdColumnVector.fields[0].noNulls = true; recordIdColumnVector.fields[0].isRepeating = true; - //all "original" is considered written by txnid:0 which committed - ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0; + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; /** * This is {@link RecordIdentifier#getBucketProperty()} * Also see {@link BucketCodec} @@ -433,15 +451,21 @@ public class VectorizedOrcAcidRowBatchReader innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0]; innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; + //these are insert events so (original txn == current) txn for all rows + innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0]; + } + if(syntheticProps.syntheticTxnId > 0) { + //"originals" (written before table was converted to acid) is considered written by + // txnid:0 which is always committed so there is no need to check wrt invalid transactions + //But originals written by Load Data for example can be in base_x or delta_x_x so we must + //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline. + findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, + vectorizedRowBatchBase.size, selectedBitSet); } } else { // Case 1- find rows which belong to transactions that are not valid. findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); - /** - * All "original" data belongs to txnid:0 and is always valid/committed for every reader - * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal - */ } // Case 2- find rows which have been deleted. @@ -473,11 +497,6 @@ public class VectorizedOrcAcidRowBatchReader } else { // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. - // NOTE: We only link up the user columns and not the ACID metadata columns because this - // vectorized code path is not being used in cases of update/delete, when the metadata columns - // would be expected to be passed up the operator pipeline. This is because - // currently the update/delete specifically disable vectorized code paths. - // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; // Transfer columnVector objects from base batch to outgoing batch. System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index f7388a4..736034d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -27,12 +27,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 6fb0c43..fdb3603 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.LockTableDesc; import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; +import org.apache.hadoop.hive.ql.plan.api.Query; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; @@ -297,6 +298,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl { break; default: if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) { + if(allowOperationInATransaction(queryPlan)) { + break; + } + //look at queryPlan.outputs(WriteEntity.t - that's the table) //for example, drop table in an explicit txn is not allowed //in some cases this requires looking at more than just the operation //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table @@ -311,6 +316,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl { any non acid and raise an appropriate error * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/ } + + /** + * This modifies the logic wrt what operations are allowed in a transaction. Multi-statement + * transaction support is incomplete but it makes some Acid tests cases much easier to write. + */ + private boolean allowOperationInATransaction(QueryPlan queryPlan) { + //Acid and MM tables support Load Data with transactional semantics. This will allow Load Data + //in a txn assuming we can determine the target is a suitable table type. + if(queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1) { + WriteEntity writeEntity = queryPlan.getOutputs().iterator().next(); + if(AcidUtils.isFullAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) { + switch (writeEntity.getWriteType()) { + case INSERT: + //allow operation in a txn + return true; + case INSERT_OVERWRITE: + //see HIVE-18154 + return false; + default: + //not relevant for LOAD + return false; + } + } + } + //todo: handle Insert Overwrite as well: HIVE-18154 + return false; + } /** * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 1a37bf7..9f2c6d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -146,6 +146,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; @@ -1705,18 +1706,20 @@ public class Hive { * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL - * @param isAcid - * true if this is an ACID operation + * @param isAcidIUDoperation + * true if this is an ACID operation Insert/Update/Delete operation * @param hasFollowingStatsTask * true if there is a following task which updates the stats, so, this method need not update. * @return Partition object being loaded with data */ public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId) + boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); + assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); + boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); try { // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); @@ -1768,7 +1771,7 @@ public class Hive { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); } - assert !isAcid; + assert !isAcidIUDoperation; if (areEventsForDmlNeeded(tbl, oldPart)) { newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } @@ -1792,16 +1795,22 @@ public class Hive { filter = (loadFileType == LoadFileType.REPLACE_ALL) ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; } + else if(!isAcidIUDoperation && isFullAcidTable) { + destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath); } - if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) { + //todo: why is "&& !isAcidIUDoperation" needed here? + if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) { + //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new + // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles); } } @@ -1891,6 +1900,38 @@ public class Hive { } } + /** + * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or + * delta_x_x directory - same as any other Acid write. This method modifies the destPath to add + * this path component. + * @param txnId - id of current transaction (in which this operation is running) + * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()} + * @return appropriately modified path + */ + private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException { + switch (loadFileType) { + case REPLACE_ALL: + destPath = new Path(destPath, AcidUtils.baseDir(txnId)); + break; + case KEEP_EXISTING: + destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + break; + case OVERWRITE_EXISTING: + //should not happen here - this is for replication + default: + throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType); + } + try { + FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf()); + if(!FileUtils.mkdir(fs, destPath, conf)) { + LOG.warn(destPath + " already exists?!?!"); + } + AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true); + } catch (IOException e) { + throw new HiveException("load: error while creating " + destPath + ";loadFileType=" + loadFileType, e); + } + return destPath; + } private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; @@ -2125,7 +2166,6 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param partSpec * @param loadFileType * @param numDP number of dynamic partitions - * @param listBucketingEnabled * @param isAcid true if this is an ACID operation * @param txnId txnId, can be 0 unless isAcid == true * @return partition map details (PartitionSpec and Partition) @@ -2273,14 +2313,16 @@ private void constructOneLBLocationMap(FileStatus fSta, * if list bucketing enabled * @param hasFollowingStatsTask * if there is any following stats task - * @param isAcid true if this is an ACID based write + * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, - Long txnId, int stmtId, boolean isMmTable) throws HiveException { - + boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, + Long txnId, int stmtId) throws HiveException { List<Path> newFiles = null; Table tbl = getTable(tableName); + assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); + boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); + boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); HiveConf sessionConf = SessionState.getSessionConf(); if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList<Path>()); @@ -2298,24 +2340,31 @@ private void constructOneLBLocationMap(FileStatus fSta, newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. - Path tblPath = tbl.getPath(), destPath = tblPath; + Path tblPath = tbl.getPath(); + Path destPath = tblPath; PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; if (isMmTable) { + assert !isAcidIUDoperation; // We will load into MM directory, and delete from the parent if needed. destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); filter = loadFileType == LoadFileType.REPLACE_ALL ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter; } + else if(!isAcidIUDoperation && isFullAcidTable) { + destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl); + } Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath + " (replace = " + loadFileType + ")"); - if (loadFileType == LoadFileType.REPLACE_ALL) { + if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) { + //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 + //todo: should probably do the same for MM IOW boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tblPath, loadPath, destPath, tblPath, sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf); - copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid, + copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); @@ -2358,7 +2407,6 @@ private void constructOneLBLocationMap(FileStatus fSta, fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); } - /** * Creates a partition. * http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index cd75130..a1b6cda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -391,7 +391,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc loadTableWork = new LoadTableDesc(destPath, Utilities.getTableDesc(table), new TreeMap<>(), replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); Task<?> loadTableTask = TaskFactory.get(mv, x.getConf()); @@ -400,6 +399,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return loadTableTask; } + /** + * todo: this is odd: transactions are opened for all statements. what is this supposed to check? + */ + @Deprecated private static boolean isAcid(Long txnId) { return (txnId != null) && (txnId != 0); } @@ -490,7 +493,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { partSpec.getPartSpec(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task<?> loadPartTask = TaskFactory.get(new MoveWork( http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 238fbd6..cc956da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -136,7 +136,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { } private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree ast, - boolean isLocal) throws SemanticException { + boolean isLocal, Table table) throws SemanticException { FileStatus[] srcs = null; @@ -159,6 +159,14 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast, "source contains directory: " + oneSrc.getPath().toString())); } + if(AcidUtils.isFullAcidTable(table)) { + if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) { + //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply + //copied to a table so only allow non-acid files for now + throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME, + oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName()); + } + } } } catch (IOException e) { // Has to use full name to make sure it does not conflict with @@ -230,11 +238,8 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { } } - if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) { - throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName()); - } // make sure the arguments make sense - List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); + List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal, ts.tableHandle); // for managed tables, make sure the file formats match if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType()) @@ -277,17 +282,16 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { } Long txnId = null; - int stmtId = 0; - Table tbl = ts.tableHandle; - if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) { + int stmtId = -1; + if (AcidUtils.isAcidTable(ts.tableHandle)) { txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement(); } LoadTableDesc loadTableWork; loadTableWork = new LoadTableDesc(new Path(fromURI), Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId); - loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); if (preservePartitionSpecs){ // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 1fa7b40..4683c9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -47,9 +47,22 @@ public class LoadTableDesc extends LoadDesc implements Serializable { private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map public enum LoadFileType { - REPLACE_ALL, // Remove all existing data before copy/move - KEEP_EXISTING, // If any file exist while copy, then just duplicate the file - OVERWRITE_EXISTING // If any file exist while copy, then just overwrite the file + /** + * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event. + * Remove all existing data before copy/move + */ + REPLACE_ALL, + /** + * This corresponds to INSERT INTO and LOAD DATA. + * If any file exist while copy, then just duplicate the file + */ + KEEP_EXISTING, + /** + * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite + * the file instead of making a duplicate copy. + * If any file exist while copy, then just overwrite the file + */ + OVERWRITE_EXISTING } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); @@ -215,14 +228,10 @@ public class LoadTableDesc extends LoadDesc implements Serializable { return currentTransactionId == null ? 0 : currentTransactionId; } - public void setTxnId(Long txnId) { - this.currentTransactionId = txnId; - } - public int getStmtId() { return stmtId; } - + //todo: should this not be passed in the c'tor? public void setStmtId(int stmtId) { this.stmtId = stmtId; } http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 7d4d379..a804527 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -577,11 +576,16 @@ public class CompactorMR { dir.getName().startsWith(AcidUtils.DELTA_PREFIX) || dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) + && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format - FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter + : AcidUtils.bucketFileFilter); for(FileStatus f : files) { // For each file, figure out which bucket it is. - Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); + Matcher matcher = isRawFormat ? + AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()) + : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()); addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap); } } else { @@ -612,8 +616,12 @@ public class CompactorMR { private void addFileToMap(Matcher matcher, Path file, boolean sawBase, Map<Integer, BucketTracker> splitToBucketMap) { if (!matcher.find()) { - LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " + - file.toString() + " Matcher=" + matcher.toString()); + String msg = "Found a non-bucket file that we thought matched the bucket pattern! " + + file.toString() + " Matcher=" + matcher.toString(); + LOG.error(msg); + //following matcher.group() would fail anyway and we don't want to skip files since that + //may be a data loss scenario + throw new IllegalArgumentException(msg); } int bucketNum = Integer.parseInt(matcher.group()); BucketTracker bt = splitToBucketMap.get(bucketNum); http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 52257c4..319e0ee 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -344,7 +344,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { //this should fail because txn aborted due to timeout CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5"); Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1")); - + //now test that we don't timeout locks we should not //heartbeater should be running in the background every 1/2 second hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); @@ -354,9 +354,9 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { runStatementOnDriver("start transaction"); runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17"); pause(750); - + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); - + //since there is txn open, we are heartbeating the txn not individual locks GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo(); Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size()); @@ -377,7 +377,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { //these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat); - + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks()); pause(750); @@ -525,7 +525,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " + "when matched then update set cur=0 " + "when not matched then insert values(s.key,s.data,1)"; - + //to allow cross join from 'teeCurMatch' + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); runStatementOnDriver(stmt); int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}}; List<String> r = runStatementOnDriver("select * from target order by key,data,cur"); @@ -569,7 +570,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { List<String> r = runStatementOnDriver("select * from target order by key,data,cur"); Assert.assertEquals(stringifyValues(resultVals), r); } - + @Test public void testMergeOnTezEdges() throws Exception { String query = "merge into " + Table.ACIDTBL + http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 17d976a..ab5f969 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -77,7 +77,7 @@ public class TestTxnCommands2 { ).getPath().replaceAll("\\\\", "/"); protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - protected static int BUCKET_COUNT = 2; + static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); @@ -117,12 +117,11 @@ public class TestTxnCommands2 { setUpWithTableProperties("'transactional'='true'"); } - protected void setUpWithTableProperties(String tableProperties) throws Exception { + void setUpWithTableProperties(String tableProperties) throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); - hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -406,7 +405,7 @@ public class TestTxnCommands2 { expectedException.expect(RuntimeException.class); expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); + runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')"); } /** * Test the query correctness and directory layout for ACID table conversion