HIVE-17458 VectorizedOrcAcidRowBatchReader doesn't handle 'original' files (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1649c074 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1649c074 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1649c074 Branch: refs/heads/master Commit: 1649c0741a8c065b4831e8495a2a01b919d8b6d0 Parents: 7006ade Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Sat Nov 4 10:14:04 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Sat Nov 4 10:14:04 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 13 +- .../test/resources/testconfiguration.properties | 6 +- .../org/apache/hadoop/hive/ql/QTestUtil.java | 33 +- .../apache/hadoop/hive/ql/udf/UDFRunWorker.java | 35 + .../hive/llap/io/api/impl/LlapRecordReader.java | 3 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 51 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 2 +- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 11 +- .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 2 +- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 352 +++++++-- .../ql/io/orc/VectorizedOrcAcidRowReader.java | 143 ---- .../apache/hadoop/hive/ql/TestTxnCommands.java | 6 +- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 118 +++ .../TestVectorizedOrcAcidRowBatchReader.java | 24 +- .../acid_vectorization_original.q | 138 ++++ .../acid_vectorization_original_tez.q | 125 ++++ .../llap/acid_vectorization_original.q.out | 726 ++++++++++++++++++ .../tez/acid_vectorization_original_tez.q.out | 737 +++++++++++++++++++ 18 files changed, 2232 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cbe4de5..48341a8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1895,15 +1895,10 @@ public class HiveConf extends Configuration { "hive.lock.numretries and hive.lock.sleep.between.retries."), HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1, - "Sets the operational properties that control the appropriate behavior for various\n" - + "versions of the Hive ACID subsystem. Mostly it is intended to be used as an internal property\n" - + "for future versions of ACID. (See HIVE-14035 for details.)\n" - + "0: Turn on the legacy mode for ACID\n" - + "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" - + "2: Hash-based merge, which combines delta files using GRACE hash join based approach (not implemented)\n" - + "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" - + "This is intended to be used as an internal property for future versions of ACID. (See\n" + - "HIVE-14035 for details.)"), + "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" + + "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" + + "This is intended to be used as an internal property for future versions of ACID. (See\n" + + "HIVE-14035 for details.)"), HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 462f332..9642697 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1,3 +1,5 @@ +# Note: the *.shared groups also run on TestCliDriver + # NOTE: files should be listed in alphabetical order minimr.query.files=infer_bucket_sort_map_operators.q,\ infer_bucket_sort_dyn_part.q,\ @@ -50,7 +52,8 @@ minitez.query.files.shared=delete_orig_table.q,\ # NOTE: Add tests to minitez only if it is very # specific to tez and cannot be added to minillap. -minitez.query.files=explainuser_3.q,\ +minitez.query.files=acid_vectorization_original_tez.q,\ + explainuser_3.q,\ explainanalyze_1.q,\ explainanalyze_2.q,\ explainanalyze_3.q,\ @@ -474,6 +477,7 @@ minillaplocal.query.files=\ acid_no_buckets.q, \ acid_globallimit.q,\ acid_vectorization_missing_cols.q,\ + acid_vectorization_original.q,\ alter_merge_stats_orc.q,\ authorization_view_8.q,\ auto_join30.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 4477954..6c34c08 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -85,9 +85,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hive.cli.CliDriver; import org.apache.hadoop.hive.cli.CliSessionState; @@ -1561,6 +1559,16 @@ public class QTestUtil { } } } + else { + for (PatternReplacementPair prp : partialPlanMask) { + matcher = prp.pattern.matcher(line); + if (matcher.find()) { + line = line.replaceAll(prp.pattern.pattern(), prp.replacement); + partialMaskWasMatched = true; + break; + } + } + } if (!partialMaskWasMatched) { for (Pair<Pattern, String> pair : patternsWithMaskComments) { @@ -1650,7 +1658,26 @@ public class QTestUtil { "data/warehouse/(.*?/)+\\.hive-staging" // the directory might be db/table/partition //TODO: add more expected test result here }); - + /** + * Pattern to match and (partial) replacement text. + * For example, {"transaction":76,"bucketid":8249877}. We just want to mask 76 but a regex that + * matches just 76 will match a lot of other things. + */ + private final static class PatternReplacementPair { + private final Pattern pattern; + private final String replacement; + PatternReplacementPair(Pattern p, String r) { + pattern = p; + replacement = r; + } + } + private final PatternReplacementPair[] partialPlanMask; + { + ArrayList<PatternReplacementPair> ppm = new ArrayList<>(); + ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"transactionid\":[1-9][0-9]*,\"bucketid\":"), + "{\"transactionid\":### Masked txnid ###,\"bucketid\":")); + partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); + } /* This list may be modified by specific cli drivers to mask strings that change on every test */ private final List<Pair<Pattern, String>> patternsWithMaskComments = new ArrayList<Pair<Pattern, String>>() {{ add(toPatternPair("(pblob|s3.?|swift|wasb.?).*hive-staging.*","### BLOBSTORE_STAGING_PATH ###")); http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java new file mode 100644 index 0000000..de6aca2 --- /dev/null +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/udf/UDFRunWorker.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf; + +import org.apache.hadoop.hive.ql.TestTxnCommands2; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * A UDF for testing, which does key/value lookup from a file + */ +@Description(name = "runWorker", + value = "_FUNC_() - UDF launching Compaction Worker") +public class UDFRunWorker extends UDF { + public void evaluate() throws Exception { + TestTxnCommands2.runWorker(SessionState.get().getConf()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index d66fac2..5f010be 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -277,7 +278,7 @@ class LlapRecordReader acidVrb.cols = cvb.cols; acidVrb.size = cvb.size; final VectorizedOrcAcidRowBatchReader acidReader = - new VectorizedOrcAcidRowBatchReader(split, jobConf, Reporter.NULL, + new VectorizedOrcAcidRowBatchReader((OrcSplit)split, jobConf, Reporter.NULL, new RecordReader<NullWritable, VectorizedRowBatch>() { @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index c364343..1e5b841 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1849,6 +1849,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { + //CombineHiveInputFormat may produce FileSplit that is not OrcSplit boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf); boolean isAcidRead = isAcidRead(conf, inputSplit); if (!isAcidRead) { @@ -1868,27 +1869,19 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } reporter.setStatus(inputSplit.toString()); + //if here we are now doing an Acid read so must have OrcSplit. CombineHiveInputFormat is + // disabled for acid path + OrcSplit split = (OrcSplit) inputSplit; - boolean isFastVectorizedReaderAvailable = - VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, inputSplit); - - if (vectorMode && isFastVectorizedReaderAvailable) { - // Faster vectorized ACID row batch reader is available that avoids row-by-row stitching. + if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) - new VectorizedOrcAcidRowBatchReader(inputSplit, conf, reporter); + new VectorizedOrcAcidRowBatchReader(split, conf, reporter); } Options options = new Options(conf).reporter(reporter); final RowReader<OrcStruct> inner = getReader(inputSplit, options); - if (vectorMode && !isFastVectorizedReaderAvailable) { - // Vectorized regular ACID reader that does row-by-row stitching. - return (org.apache.hadoop.mapred.RecordReader) - new VectorizedOrcAcidRowReader(inner, conf, - Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit); - } else { - // Non-vectorized regular ACID reader. - return new NullKeyRecordReader(inner, conf); - } + // Non-vectorized regular ACID reader. + return new NullKeyRecordReader(inner, conf); } /** @@ -1945,38 +1938,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, throws IOException { final OrcSplit split = (OrcSplit) inputSplit; - final Path path = split.getPath(); - Path root; - if (split.hasBase()) { - if (split.isOriginal()) { - root = split.getRootDir(); - } else { - root = path.getParent().getParent(); - assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() + - " path.p.p=" + root; - } - } else { - throw new IllegalStateException("Split w/o base: " + path); - } // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat. AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + if(!acidOperationalProperties.isSplitUpdate()) { + throw new IllegalStateException("Expected SpliUpdate table: " + split.getPath()); + } - // The deltas are decided based on whether split-update has been turned on for the table or not. - // When split-update is turned off, everything in the delta_x_y/ directory should be treated - // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory - // need to be considered as delta, because files in delta_x_y/ will be processed as base files - // since they only have insert events in them. - final Path[] deltas = - acidOperationalProperties.isSplitUpdate() ? - AcidUtils.deserializeDeleteDeltas(root, split.getDeltas()) - : AcidUtils.deserializeDeltas(root, split.getDeltas()); + final Path[] deltas = VectorizedOrcAcidRowBatchReader.getDeleteDeltaDirsFromSplit(split); final Configuration conf = options.getConfiguration(); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split); OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false); - mergerOptions.rootPath(root); + mergerOptions.rootPath(split.getRootDir()); final int bucket; if (split.hasBase()) { AcidOutputFormat.Options acidIOOptions = http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index eed6d22..95a60dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -433,7 +433,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); if (bucketOptions.getBucketId() != bucketId) { - continue; + continue;//todo: HIVE-16952 } if (haveSeenCurrentFile) { //if here we already saw current file and now found another file for the same bucket http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1e19a91..315cc1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -62,21 +62,24 @@ public class OrcRecordUpdater implements RecordUpdater { private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class); - public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; - public static final String ACID_FORMAT = "_orc_acid_version"; - public static final int ORC_ACID_VERSION = 0; + static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; + private static final String ACID_FORMAT = "_orc_acid_version"; + private static final int ORC_ACID_VERSION = 0; final static int INSERT_OPERATION = 0; final static int UPDATE_OPERATION = 1; final static int DELETE_OPERATION = 2; - + //column indexes of corresponding data in storage layer final static int OPERATION = 0; final static int ORIGINAL_TRANSACTION = 1; final static int BUCKET = 2; final static int ROW_ID = 3; final static int CURRENT_TRANSACTION = 4; final static int ROW = 5; + /** + * total number of fields (above) + */ final static int FIELDS = 6; final static int DELTA_BUFFER_SIZE = 16 * 1024; http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 260a5ac..58638b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -29,7 +29,6 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ColumnarSplit; @@ -238,6 +237,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit final AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(conf); final boolean isSplitUpdate = acidOperationalProperties.isSplitUpdate(); + assert isSplitUpdate : "should be true in Hive 3.0"; if (isOriginal) { if (!isAcidRead && !hasDelta) { http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1e16f09..bcde4fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -24,6 +24,7 @@ import java.util.BitSet; import java.util.Map.Entry; import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -37,9 +38,12 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -51,10 +55,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; /** - * A fast vectorized batch reader class for ACID when split-update behavior is enabled. - * When split-update is turned on, row-by-row stitching could be avoided to create the final - * version of a row. Essentially, there are only insert and delete events. Insert events can be - * directly read from the base files/insert_only deltas in vectorized row batches. The deleted + * A fast vectorized batch reader class for ACID. Insert events are read directly + * from the base files/insert_only deltas in vectorized row batches. The deleted * rows can then be easily indicated via the 'selected' field of the vectorized row batch. * Refer HIVE-14233 for more details. */ @@ -63,26 +65,51 @@ public class VectorizedOrcAcidRowBatchReader private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class); - public org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader; - protected VectorizedRowBatchCtx rbCtx; - protected VectorizedRowBatch vectorizedRowBatchBase; + private org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader; + private final VectorizedRowBatchCtx rbCtx; + private VectorizedRowBatch vectorizedRowBatchBase; private long offset; private long length; protected float progress = 0.0f; protected Object[] partitionValues; - protected boolean addPartitionCols = true; - private ValidTxnList validTxnList; - protected DeleteEventRegistry deleteEventRegistry; - protected StructColumnVector recordIdColumnVector; - private org.apache.orc.Reader.Options readerOptions; + private boolean addPartitionCols = true; + private final ValidTxnList validTxnList; + private final DeleteEventRegistry deleteEventRegistry; + /** + * {@link RecordIdentifier}/{@link VirtualColumn#ROWID} information + */ + private final StructColumnVector recordIdColumnVector; + private final Reader.Options readerOptions; + private final boolean isOriginal; + /** + * something further in the data pipeline wants {@link VirtualColumn#ROWID} + */ + private final boolean rowIdProjected; + /** + * partition/table root + */ + private final Path rootPath; + /** + * for reading "original" files + */ + private final OffsetAndBucketProperty syntheticProps; + /** + * To have access to {@link RecordReader#getRowNumber()} in the underlying file + */ + private RecordReader innerReader; - public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, - Reporter reporter) throws IOException { - this.init(inputSplit, conf, reporter, Utilities.getVectorizedRowBatchCtx(conf)); + VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, + Reporter reporter) throws IOException { + this(inputSplit, conf,reporter, null); + } + @VisibleForTesting + VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, + Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException { + this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. - final RecordReader innerReader = reader.rowsOptions(readerOptions.range(offset, length)); + innerReader = reader.rowsOptions(readerOptions.range(offset, length)); baseReader = new org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch>() { @Override @@ -117,16 +144,19 @@ public class VectorizedOrcAcidRowBatchReader }; this.vectorizedRowBatchBase = ((RecordReaderImpl) innerReader).createRowBatch(); } - - public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter, + /** + * LLAP IO c'tor + */ + public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter, org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> baseReader, VectorizedRowBatchCtx rbCtx) throws IOException { - this.init(inputSplit, conf, reporter, rbCtx); + this(conf, inputSplit, reporter, rbCtx); this.baseReader = baseReader; + this.innerReader = null; this.vectorizedRowBatchBase = baseReader.createValue(); } - private void init(InputSplit inputSplit, JobConf conf, Reporter reporter, + private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter, VectorizedRowBatchCtx rowBatchCtx) throws IOException { this.rbCtx = rowBatchCtx; final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); @@ -143,8 +173,7 @@ public class VectorizedOrcAcidRowBatchReader final OrcSplit orcSplit = (OrcSplit) inputSplit; reporter.setStatus(orcSplit.toString()); - readerOptions = OrcInputFormat.createOptionsForReader(conf); - readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf)); this.offset = orcSplit.getStart(); this.length = orcSplit.getLength(); @@ -167,55 +196,161 @@ public class VectorizedOrcAcidRowBatchReader deleteEventReaderOptions.range(0, Long.MAX_VALUE); // Disable SARGs for deleteEventReaders, as SARGs have no meaning. deleteEventReaderOptions.searchArgument(null, null); + DeleteEventRegistry der; try { // See if we can load all the delete events from all the delete deltas in memory... - this.deleteEventRegistry = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + der = new ColumnizedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } catch (DeleteEventsOverflowMemoryException e) { // If not, then create a set of hanging readers that do sort-merge to find the next smallest // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). - this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); + der = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } - - recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + this.deleteEventRegistry = der; + isOriginal = orcSplit.isOriginal(); + if(isOriginal) { + recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + new LongColumnVector(), new LongColumnVector(), new LongColumnVector()); + } + else { + //will swap in the Vectors from underlying row batch + recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + } + rowIdProjected = areRowIdsProjected(rbCtx); + rootPath = orcSplit.getRootDir(); + syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList); } /** - * Returns whether it is possible to create a valid instance of this class for a given split. - * @param conf is the job configuration - * @param inputSplit - * @return true if it is possible, else false. + * Used for generating synthetic ROW__IDs for reading "original" files + */ + private static final class OffsetAndBucketProperty { + private final long rowIdOffset; + private final int bucketProperty; + private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) { + this.rowIdOffset = rowIdOffset; + this.bucketProperty = bucketProperty; + } + } + /** + * See {@link #next(NullWritable, VectorizedRowBatch)} fist and + * {@link OrcRawRecordMerger.OriginalReaderPair}. + * When reading a split of an "original" file and we need to decorate data with ROW__ID. + * This requires treating multiple files that are part of the same bucket (tranche for unbucketed + * tables) as a single logical file to number rowids consistently. + * + * todo: This logic is executed per split of every "original" file. The computed result is the + * same for every split form the same file so this could be optimized by moving it to + * before/during splt computation and passing the info in the split. (HIVE-17917) + */ + private OffsetAndBucketProperty computeOffsetAndBucket( + OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException { + if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) { + return new OffsetAndBucketProperty(0,0); + } + long rowIdOffset = 0; + int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); + int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId)); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf, + validTxnList, false, true); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucketId() != bucketId) { + continue;//HIVE-16952 + } + if (f.getFileStatus().getPath().equals(split.getPath())) { + //'f' is the file whence this split is + break; + } + Reader reader = OrcFile.createReader(f.getFileStatus().getPath(), + OrcFile.readerOptions(conf)); + rowIdOffset += reader.getNumberOfRows(); + } + return new OffsetAndBucketProperty(rowIdOffset, bucketProperty); + } + /** + * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables. + * In some cases this cannot be used from LLAP IO elevator because + * {@link RecordReader#getRowNumber()} is not (currently) available there but is required to + * generate ROW__IDs for "original" files + * @param hasDeletes - if there are any deletes that apply to this split + * todo: HIVE-17944 */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { - if (!(inputSplit instanceof OrcSplit)) { - return false; // must be an instance of OrcSplit. - } - // First check if we are reading any original files in the split. - // To simplify the vectorization logic, the vectorized acid row batch reader does not handle - // original files for now as they have a different schema than a regular ACID file. - final OrcSplit split = (OrcSplit) inputSplit; - if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) { - // When split-update is turned on for ACID, a more optimized vectorized batch reader - // can be created. But still only possible when we are *NOT* reading any originals. + static boolean canUseLlapForAcid(OrcSplit split, boolean hasDeletes, Configuration conf) { + if(!split.isOriginal()) { return true; } - return false; // no split-update or possibly reading originals! + VectorizedRowBatchCtx rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + if(rbCtx == null) { + throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath()); + } + return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx)); } - private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { + /** + * Does this reader need to decorate rows with ROW__IDs (for "original" reads). + * Even if ROW__ID is not projected you still need to decorate the rows with them to see if + * any of the delete events apply. + */ + private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) { + return split.isOriginal() && (hasDeletes || rowIdProjected); + } + private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) { + if(rbCtx.getVirtualColumnCount() == 0) { + return false; + } + for(VirtualColumn vc : rbCtx.getNeededVirtualColumns()) { + if(vc == VirtualColumn.ROWID) { + //The query needs ROW__ID: maybe explicitly asked, maybe it's part of + // Update/Delete statement. + //Either way, we need to decorate "original" rows with row__id + return true; + } + } + return false; + } + static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { if (orcSplit.isOriginal()) { - root = path.getParent(); + root = orcSplit.getRootDir(); } else { root = path.getParent().getParent(); + assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() + + " path.p.p=" + root; } } else { - root = path; + throw new IllegalStateException("Split w/o base w/Acid 2.0??: " + path); } return AcidUtils.deserializeDeleteDeltas(root, orcSplit.getDeltas()); } + /** + * There are 2 types of schema from the {@link #baseReader} that this handles. In the case + * the data was written to a transactional table from the start, every row is decorated with + * transaction related info and looks like <op, otid, writerId, rowid, ctid, <f1, ... fn>>. + * + * The other case is when data was written to non-transactional table and thus only has the user + * data: <f1, ... fn>. Then this table was then converted to a transactional table but the data + * files are not changed until major compaction. These are the "original" files. + * + * In this case we may need to decorate the outgoing data with transactional column values at + * read time. (It's done somewhat out of band via VectorizedRowBatchCtx - ask Teddy Choi). + * The "otid, writerId, rowid" columns represent {@link RecordIdentifier}. They are assigned + * each time the table is read in a way that needs to project {@link VirtualColumn#ROWID}. + * Major compaction will attach these values to each row permanently. + * It's critical that these generated column values are assigned exactly the same way by each + * read of the same row and by the Compactor. + * See {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorMR} and + * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} for the Compactor read path. + * (Longer term should make compactor use this class) + * + * This only decorates original rows with metadata if something above is requesting these values + * or if there are Delete events to apply. + * + * @return false where there is no more data, i.e. {@code value} is empty + */ @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { try { @@ -257,12 +392,60 @@ public class VectorizedOrcAcidRowBatchReader // When selectedInUse is set to false, everything in the batch is selected. selectedBitSet.set(0, vectorizedRowBatchBase.size, true); } - - // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols; + if(isOriginal) { + /* + * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order + * to see if any deletes apply + */ + if(rowIdProjected || !deleteEventRegistry.isEmpty()) { + if(innerReader == null) { + throw new IllegalStateException(getClass().getName() + " requires " + + org.apache.orc.RecordReader.class + + " to handle original files that require ROW__IDs: " + rootPath); + } + /** + * {@link RecordIdentifier#getTransactionId()} + */ + recordIdColumnVector.fields[0].noNulls = true; + recordIdColumnVector.fields[0].isRepeating = true; + //all "original" is considered written by txnid:0 which committed + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0; + /** + * This is {@link RecordIdentifier#getBucketProperty()} + * Also see {@link BucketCodec} + */ + recordIdColumnVector.fields[1].noNulls = true; + recordIdColumnVector.fields[1].isRepeating = true; + ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty; + /** + * {@link RecordIdentifier#getRowId()} + */ + recordIdColumnVector.fields[2].noNulls = true; + recordIdColumnVector.fields[2].isRepeating = false; + long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector; + for(int i = 0; i < vectorizedRowBatchBase.size; i++) { + //baseReader.getRowNumber() seems to point at the start of the batch todo: validate + rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i; + } + //Now populate a structure to use to apply delete events + innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS]; + innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0]; + innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; + innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; + } + } + else { + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + /** + * All "original" data belongs to txnid:0 and is always valid/committed for every reader + * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal + */ + } // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase.cols, + this.deleteEventRegistry.findDeletedRecords(innerRecordIdColumnVector, vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { @@ -283,30 +466,39 @@ public class VectorizedOrcAcidRowBatchReader } } - // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. - // NOTE: We only link up the user columns and not the ACID metadata columns because this - // vectorized code path is not being used in cases of update/delete, when the metadata columns - // would be expected to be passed up the operator pipeline. This is because - // currently the update/delete specifically disable vectorized code paths. - // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() - StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; - // Transfer columnVector objects from base batch to outgoing batch. - System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); - if (rbCtx != null) { - recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; - recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; - recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; + if(isOriginal) { + /*Just copy the payload. {@link recordIdColumnVector} has already been populated*/ + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, + value.getDataColumnCount()); + } + else { + // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. + // NOTE: We only link up the user columns and not the ACID metadata columns because this + // vectorized code path is not being used in cases of update/delete, when the metadata columns + // would be expected to be passed up the operator pipeline. This is because + // currently the update/delete specifically disable vectorized code paths. + // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() + StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; + // Transfer columnVector objects from base batch to outgoing batch. + System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); + if(rowIdProjected) { + recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; + recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; + recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; + } + } + if(rowIdProjected) { rbCtx.setRecordIdColumnVector(recordIdColumnVector); } progress = baseReader.getProgress(); return true; } - protected void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { + private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet); } - protected void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { + private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { // When we have repeating values, we can unset the whole bitset at once // if the repeating value is not a valid transaction. @@ -387,6 +579,11 @@ public class VectorizedOrcAcidRowBatchReader * @throws IOException */ public void close() throws IOException; + + /** + * @return {@code true} if no delete events were found + */ + boolean isEmpty(); } /** @@ -400,10 +597,10 @@ public class VectorizedOrcAcidRowBatchReader private OrcRawRecordMerger deleteRecords; private OrcRawRecordMerger.ReaderKey deleteRecordKey; private OrcStruct deleteRecordValue; - private boolean isDeleteRecordAvailable = true; + private Boolean isDeleteRecordAvailable = null; private ValidTxnList validTxnList; - public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) + SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { @@ -428,6 +625,13 @@ public class VectorizedOrcAcidRowBatchReader } @Override + public boolean isEmpty() { + if(isDeleteRecordAvailable == null) { + throw new IllegalStateException("Not yet initialized"); + } + return !isDeleteRecordAvailable; + } + @Override public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (!isDeleteRecordAvailable) { @@ -546,7 +750,7 @@ public class VectorizedOrcAcidRowBatchReader */ private int bucketProperty; private long rowId; - public DeleteRecordKey() { + DeleteRecordKey() { this.originalTransactionId = -1; this.rowId = -1; } @@ -596,7 +800,7 @@ public class VectorizedOrcAcidRowBatchReader private boolean isBucketPropertyRepeating; private final boolean isBucketedTable; - public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, + DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket, ValidTxnList validTxnList, boolean isBucketedTable) throws IOException { this.recordReader = deleteDeltaReader.rowsOptions(readerOptions); this.bucketForSplit = bucket; @@ -741,8 +945,9 @@ public class VectorizedOrcAcidRowBatchReader private long rowIds[]; private CompressedOtid compressedOtids[]; private ValidTxnList validTxnList; + private Boolean isEmpty = null; - public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, + ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); @@ -804,6 +1009,7 @@ public class VectorizedOrcAcidRowBatchReader readAllDeleteEventsFromDeleteDeltas(); } } + isEmpty = compressedOtids == null || rowIds == null; } catch(IOException|DeleteEventsOverflowMemoryException e) { close(); // close any open readers, if there was some exception during initialization. throw e; // rethrow the exception so that the caller can handle. @@ -910,7 +1116,13 @@ public class VectorizedOrcAcidRowBatchReader } return false; } - + @Override + public boolean isEmpty() { + if(isEmpty == null) { + throw new IllegalStateException("Not yet initialized"); + } + return isEmpty; + } @Override public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java deleted file mode 100644 index 885ef83..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io.orc; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.AcidInputFormat; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.*; - -import java.io.IOException; - -/** - * Implement a RecordReader that stitches together base and delta files to - * support tables and partitions stored in the ACID format. It works by using - * the non-vectorized ACID reader and moving the data into a vectorized row - * batch. - */ -public class VectorizedOrcAcidRowReader - implements org.apache.hadoop.mapred.RecordReader<NullWritable, - VectorizedRowBatch> { - private final AcidInputFormat.RowReader<OrcStruct> innerReader; - private final RecordIdentifier key; - private final OrcStruct value; - private VectorizedRowBatchCtx rbCtx; - private Object[] partitionValues; - private final ObjectInspector objectInspector; - private final DataOutputBuffer buffer = new DataOutputBuffer(); - private final StructColumnVector recordIdColumnVector; - private final LongColumnVector transactionColumnVector; - private final LongColumnVector bucketColumnVector; - private final LongColumnVector rowIdColumnVector; - - public VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner, - Configuration conf, VectorizedRowBatchCtx vectorizedRowBatchCtx, FileSplit split) - throws IOException { - this.innerReader = inner; - this.key = inner.createKey(); - rbCtx = vectorizedRowBatchCtx; - int partitionColumnCount = rbCtx.getPartitionColumnCount(); - if (partitionColumnCount > 0) { - partitionValues = new Object[partitionColumnCount]; - rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); - } - this.value = inner.createValue(); - this.objectInspector = inner.getObjectInspector(); - this.transactionColumnVector = new LongColumnVector(); - this.bucketColumnVector = new LongColumnVector(); - this.rowIdColumnVector = new LongColumnVector(); - this.recordIdColumnVector = - new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, - transactionColumnVector, bucketColumnVector, rowIdColumnVector); - } - - @Override - public boolean next(NullWritable nullWritable, - VectorizedRowBatch vectorizedRowBatch - ) throws IOException { - vectorizedRowBatch.reset(); - buffer.reset(); - if (!innerReader.next(key, value)) { - return false; - } - if (partitionValues != null) { - rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues); - } - try { - VectorizedBatchUtil.acidAddRowToBatch(value, - (StructObjectInspector) objectInspector, - vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer); - addRecordId(vectorizedRowBatch.size, key); - vectorizedRowBatch.size++; - while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length && - innerReader.next(key, value)) { - VectorizedBatchUtil.acidAddRowToBatch(value, - (StructObjectInspector) objectInspector, - vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer); - addRecordId(vectorizedRowBatch.size, key); - vectorizedRowBatch.size++; - } - rbCtx.setRecordIdColumnVector(recordIdColumnVector); - } catch (Exception e) { - throw new IOException("error iterating", e); - } - return true; - } - - private void addRecordId(int index, RecordIdentifier key) { - transactionColumnVector.vector[index] = key.getTransactionId(); - bucketColumnVector.vector[index] = key.getBucketProperty(); - rowIdColumnVector.vector[index] = key.getRowId(); - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - return rbCtx.createVectorizedRowBatch(); - } - - @Override - public long getPos() throws IOException { - return innerReader.getPos(); - } - - @Override - public void close() throws IOException { - innerReader.close(); - } - - @Override - public float getProgress() throws IOException { - return innerReader.getProgress(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 662462c..2c76f79 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -56,9 +56,9 @@ import java.util.concurrent.TimeUnit; * test AC=true, and AC=false with commit/rollback/exception and test resulting data. * * Can also test, calling commit in AC=true mode, etc, toggling AC... - * - * Tests here are for multi-statement transactions (WIP) and those that don't need to - * run with Acid 2.0 (see subclasses of TestTxnCommands2) + * + * Tests here are for multi-statement transactions (WIP) and others + * Mostly uses bucketed tables */ public class TestTxnCommands extends TxnCommandsBaseForTests { static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index c827dc4..f0d9ff2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -522,5 +526,119 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver // Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode()); Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support")); } + /** + * Tests to check that we are able to use vectorized acid reader, + * VectorizedOrcAcidRowBatchReader, when reading "original" files, + * i.e. those that were written before the table was converted to acid. + * See also acid_vectorization_original*.q + */ + @Test + public void testNonAcidToAcidVectorzied() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + //this enables vectorization of ROW__ID + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int) stored as orc"); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values)); + //, 'transactional_properties'='default' + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); + //Execution mode: vectorized + //this uses VectorizedOrcAcidRowBatchReader + String query = "select a from T where b > 6 order by a"; + List<String> rs = runStatementOnDriver(query); + String[][] expected = { + {"6", ""}, + {"9", ""}, + }; + checkExpected(rs, expected, "After conversion"); + Assert.assertEquals(Integer.toString(6), rs.get(0)); + Assert.assertEquals(Integer.toString(9), rs.get(1)); + assertVectorized(true, query); + + //why isn't PPD working.... - it is working but storage layer doesn't do row level filtering; only row group level + //this uses VectorizedOrcAcidRowBatchReader + query = "select ROW__ID, a from T where b > 6 order by a"; + rs = runStatementOnDriver(query); + String[][] expected1 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} + }; + checkExpected(rs, expected1, "After conversion with VC1"); + assertVectorized(true, query); + + //this uses VectorizedOrcAcidRowBatchReader + query = "select ROW__ID, a from T where b > 0 order by a"; + rs = runStatementOnDriver(query); + String[][] expected2 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}", "1"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}", "2"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}", "5"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}", "6"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} + }; + checkExpected(rs, expected2, "After conversion with VC2"); + assertVectorized(true, query); + + //doesn't vectorize (uses neither of the Vectorzied Acid readers) + query = "select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a"; + rs = runStatementOnDriver(query); + Assert.assertEquals("", 2, rs.size()); + String[][] expected3 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"} + }; + checkExpected(rs, expected3, "After non-vectorized read"); + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + //vectorized because there is INPUT__FILE__NAME + assertVectorized(false, query); + + runStatementOnDriver("update T set b = 17 where a = 1"); + //this should use VectorizedOrcAcidRowReader + query = "select ROW__ID, b from T where b > 0 order by a"; + rs = runStatementOnDriver(query); + String[][] expected4 = { + {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}","17"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}","4"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}","6"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}","8"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}","10"} + }; + checkExpected(rs, expected4, "After conversion with VC4"); + assertVectorized(true, query); + + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + //this should not vectorize at all + query = "select ROW__ID, a, b, INPUT__FILE__NAME from T where b > 0 order by a, b"; + rs = runStatementOnDriver(query); + String[][] expected5 = {//the row__ids are the same after compaction + {"{\"transactionid\":25,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "warehouse/t/base_0000025/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4", "warehouse/t/base_0000025/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_0000025/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8", "warehouse/t/base_0000025/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_0000025/bucket_00000"} + }; + checkExpected(rs, expected5, "After major compaction"); + //vectorized because there is INPUT__FILE__NAME + assertVectorized(false, query); + } + private void assertVectorized(boolean vectorized, String query) throws Exception { + List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query); + for(String line : rs) { + if(line != null && line.contains("Execution mode: vectorized")) { + Assert.assertTrue("Was vectorized when it wasn't expected", vectorized); + return; + } + } + Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 43e0a4a..b2ac687 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; @@ -100,10 +101,11 @@ public class TestVectorizedOrcAcidRowBatchReader { conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); - fs = FileSystem.getLocal(conf); Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + File.separator + "tmp")); root = new Path(workDir, "TestVectorizedOrcAcidRowBatch.testDump"); + fs = root.getFileSystem(conf); + root = fs.makeQualified(root); fs.delete(root, true); ObjectInspector inspector; synchronized (TestOrcFile.class) { @@ -189,7 +191,7 @@ public class TestVectorizedOrcAcidRowBatchReader { assertEquals(1, splitStrategies.size()); List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); assertEquals(1, splits.size()); - assertEquals("file://" + root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000", + assertEquals(root.toUri().toString() + File.separator + "delta_0000001_0000010_0000/bucket_00000", splits.get(0).getPath().toUri().toString()); assertFalse(splits.get(0).isOriginal()); return splits; @@ -216,7 +218,7 @@ public class TestVectorizedOrcAcidRowBatchReader { // are being handled properly. conf.set(ValidTxnList.VALID_TXNS_KEY, "14:1:1:5"); // Exclude transaction 5 - VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL); + VectorizedOrcAcidRowBatchReader vectorizedReader = new VectorizedOrcAcidRowBatchReader(splits.get(0), conf, Reporter.NULL, new VectorizedRowBatchCtx()); if (deleteEventRegistry.equals(ColumnizedDeleteEventRegistry.class.getName())) { assertTrue(vectorizedReader.getDeleteEventRegistry() instanceof ColumnizedDeleteEventRegistry); } @@ -242,20 +244,4 @@ public class TestVectorizedOrcAcidRowBatchReader { } } } - - @Test - public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception { - OrcSplit mockSplit = Mockito.mock(OrcSplit.class); - - conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, - AcidUtils.AcidOperationalProperties.getDefault().toInt()); - Mockito.when(mockSplit.isOriginal()).thenReturn(true); - // Test false when trying to create a vectorized ACID row batch reader when reading originals. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); - - // A positive test case. - Mockito.when(mockSplit.isOriginal()).thenReturn(false); - assertTrue(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/queries/clientpositive/acid_vectorization_original.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original.q b/ql/src/test/queries/clientpositive/acid_vectorization_original.q new file mode 100644 index 0000000..ddf138d --- /dev/null +++ b/ql/src/test/queries/clientpositive/acid_vectorization_original.q @@ -0,0 +1,138 @@ +set hive.mapred.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.vectorized.execution.enabled=true; +-- enables vectorizaiton of VirtualColumn.ROWID +set hive.vectorized.row.identifier.enabled=true; +-- enable ppd +set hive.optimize.index.filter=true; + +set hive.explain.user=false; + +-- needed for TestCliDriver but not TestMiniTezCliDriver +-- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; + +-- attempts to get compaction to run - see below +-- set yarn.scheduler.maximum-allocation-mb=2024; +-- set hive.tez.container.size=500; +-- set mapreduce.map.memory.mb=500; +-- set mapreduce.reduce.memory.mb=500; +-- set mapred.job.map.memory.mb=500; +-- set mapred.job.reduce.memory.mb=500; + + + +CREATE TEMPORARY FUNCTION runWorker AS 'org.apache.hadoop.hive.ql.udf.UDFRunWorker'; +create table mydual(a int); +insert into mydual values(1); + +CREATE TABLE over10k(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + `dec` decimal(4,2), + bin binary) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' +STORED AS TEXTFILE; + +--oddly this has 9999 rows not > 10K +LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE over10k; + +CREATE TABLE over10k_orc_bucketed(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + `dec` decimal(4,2), + bin binary) CLUSTERED BY(si) INTO 4 BUCKETS STORED AS ORC; + +-- this produces about 250 distinct values across all 4 equivalence classes +select distinct si, si%4 from over10k order by si; + +-- explain insert into over10k_orc_bucketed select * from over10k cluster by si; +-- w/o "cluster by" all data is written to 000000_0 +insert into over10k_orc_bucketed select * from over10k cluster by si; + +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed; +-- create copy_N files +insert into over10k_orc_bucketed select * from over10k cluster by si; + +-- this output of this is masked in .out - it is visible in .orig +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed; + +--this actually shows the data files in the .out on Tez but not LLAP +select distinct 7 as seven, INPUT__FILE__NAME from over10k_orc_bucketed; + +-- convert table to acid +alter table over10k_orc_bucketed set TBLPROPERTIES ('transactional'='true'); + +-- this should vectorize (and push predicate to storage: filterExpr in TableScan ) +-- Execution mode: vectorized, llap +-- LLAP IO: may be used (ACID table) +explain select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i; +select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i; + +-- this should vectorize (and push predicate to storage: filterExpr in TableScan ) +-- Execution mode: vectorized, llap +-- LLAP IO: may be used (ACID table) +explain select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; +-- HIVE-17943 +select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; + +-- this should vectorize (and push predicate to storage: filterExpr in TableScan ) +-- Execution mode: vectorized, llap +-- LLAP IO: may be used (ACID table) +explain update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100; +update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100; + +-- this should produce the same result (data) as previous time this exact query ran +-- ROW__ID will be different (same bucketProperty) +select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; + +-- The idea below was to do check sum queries to ensure that ROW__IDs are unique +-- to run Compaction and to check that ROW__IDs are the same before and after compaction (for rows +-- w/o applicable delete events) +-- Everything below is commented out because it doesn't work +-- group by ROW__ID produces wrong results +-- compactor doesn't run - see error below... + +-- this doesn't vectorize but use llap which is perhaps the problem if it's using cache +-- use explain VECTORIZATION DETAIL to see +-- notVectorizedReason: Key expression for GROUPBY operator: Vectorizing complex type STRUCT not supported +explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1; + +-- this test that there are no duplicate ROW__IDs so should produce no output +-- on LLAP this produces "NULL, 6"; on tez it produces nothing: HIVE-17921 +select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1; +-- this produces nothing (as it should) +select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null; + +-- schedule compactor +-- alter table over10k_orc_bucketed compact 'major' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="500","compactor.hive.tez.container.size"="500");; + + +-- run compactor - this currently fails with +-- Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512 +-- HIVE-17922 +-- select runWorker() from mydual; + +-- show compactions; + +-- this should produce the same result (data+ ROW__ID) as previous time this exact query ran +-- select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; + +-- this test that there are no duplicate ROW__IDs so should produce no output +-- select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1; + +-- select ROW__ID, * from over10k_orc_bucketed where ROW__ID is null; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/1649c074/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q b/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q new file mode 100644 index 0000000..4d93662 --- /dev/null +++ b/ql/src/test/queries/clientpositive/acid_vectorization_original_tez.q @@ -0,0 +1,125 @@ +set hive.mapred.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.vectorized.execution.enabled=true; +-- enables vectorizaiton of VirtualColumn.ROWID +set hive.vectorized.row.identifier.enabled=true; +-- enable ppd +set hive.optimize.index.filter=true; + +set hive.explain.user=false; + +-- needed for TestCliDriver but not TestMiniTezCliDriver +-- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; + +-- attempts to get compaction to run - see below +-- set yarn.scheduler.maximum-allocation-mb=2024; +-- set hive.tez.container.size=500; +-- set mapreduce.map.memory.mb=500; +-- set mapreduce.reduce.memory.mb=500; +-- set mapred.job.map.memory.mb=500; +-- set mapred.job.reduce.memory.mb=500; + + + +CREATE TEMPORARY FUNCTION runWorker AS 'org.apache.hadoop.hive.ql.udf.UDFRunWorker'; +create table mydual(a int); +insert into mydual values(1); + +CREATE TABLE over10k(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + `dec` decimal(4,2), + bin binary) +ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' +STORED AS TEXTFILE; + +--oddly this has 9999 rows not > 10K +LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE over10k; + +CREATE TABLE over10k_orc_bucketed(t tinyint, + si smallint, + i int, + b bigint, + f float, + d double, + bo boolean, + s string, + ts timestamp, + `dec` decimal(4,2), + bin binary) CLUSTERED BY(si) INTO 4 BUCKETS STORED AS ORC; + +-- this produces about 250 distinct values across all 4 equivalence classes +select distinct si, si%4 from over10k order by si; + +-- explain insert into over10k_orc_bucketed select * from over10k cluster by si; +-- w/o "cluster by" all data is written to 000000_0 +insert into over10k_orc_bucketed select * from over10k cluster by si; + +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed; +-- create copy_N files +insert into over10k_orc_bucketed select * from over10k cluster by si; + +-- this output of this is masked in .out - it is visible in .orig +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/over10k_orc_bucketed; + +--this actually shows the data files in the .out on Tez but not LLAP +select distinct 7 as seven, INPUT__FILE__NAME from over10k_orc_bucketed; + +-- convert table to acid +alter table over10k_orc_bucketed set TBLPROPERTIES ('transactional'='true'); + +-- this should vectorize (and push predicate to storage: filterExpr in TableScan ) +-- Execution mode: vectorized (both Map and Reducer) +explain select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i; +select t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by t, si, i; + +-- this should vectorize (and push predicate to storage: filterExpr in TableScan ) +-- Execution mode: vectorized +explain select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; +select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; + +-- this should vectorize (and push predicate to storage: filterExpr in TableScan ) +-- same as above +explain update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100; +update over10k_orc_bucketed set i = 0 where b = 4294967363 and t < 100; + +-- this should produce the same result (data) as previous time this exact query ran +-- ROW__ID will be different (same bucketProperty) +select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; + +-- The idea below was to do check sum queries to ensure that ROW__IDs are unique +-- to run Compaction and to check that ROW__IDs are the same before and after compaction (for rows +-- w/o applicable delete events) + +-- this doesn't vectorize +-- use explain VECTORIZATION DETAIL to see +-- notVectorizedReason: Key expression for GROUPBY operator: Vectorizing complex type STRUCT not supported +explain select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1; + +-- this test that there are no duplicate ROW__IDs so should produce no output +select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1; + +-- schedule compactor +alter table over10k_orc_bucketed compact 'major' WITH OVERWRITE TBLPROPERTIES ("compactor.mapreduce.map.memory.mb"="500","compactor.hive.tez.container.size"="500");; + + +-- run compactor - this currently fails with +-- Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512 +-- select runWorker() from mydual; + +-- show compactions; + +-- this should produce the same (data + ROW__ID) as before compaction +select ROW__ID, t, si, i from over10k_orc_bucketed where b = 4294967363 and t < 100 order by ROW__ID; + +-- this test that there are no duplicate ROW__IDs so should produce no output +select ROW__ID, count(*) from over10k_orc_bucketed group by ROW__ID having count(*) > 1;