Repository: hive Updated Branches: refs/heads/master ce2754daf -> 16225d204
HIVE-20302: LLAP: non-vectorized execution in IO ignores virtual columns, including ROW__ID (Jesus Camacho Rodriguez, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/16225d20 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/16225d20 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/16225d20 Branch: refs/heads/master Commit: 16225d204bf5acdd7ac08b3f1d3b8c5634f006f2 Parents: ce2754d Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Fri Aug 3 15:22:16 2018 -0700 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Fri Aug 3 19:01:11 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/BatchToRowReader.java | 28 +++++- .../hive/ql/io/orc/OrcOiBatchToRowReader.java | 40 ++++++++- .../queries/clientpositive/orc_llap_nonvector.q | 11 +++ .../llap/orc_llap_nonvector.q.out | 94 ++++++++++++++++++++ 4 files changed, 167 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java index 434a5b8..9d1d260 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.llap.DebugUtils; import java.util.Arrays; -import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; @@ -69,6 +70,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.RecordReader; /** @@ -89,6 +91,8 @@ public abstract class BatchToRowReader<StructType, UnionType> private final boolean[] included; private int rowInBatch = 0; + private final int rowIdIdx; + public BatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> vrbReader, VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) { this.vrbReader = vrbReader; @@ -104,6 +108,11 @@ public abstract class BatchToRowReader<StructType, UnionType> } else { Arrays.fill(included, true); } + // Create struct for ROW__ID virtual column and extract index + this.rowIdIdx = vrbCtx.findVirtualColumnNum(VirtualColumn.ROWID); + if (this.rowIdIdx >= 0) { + included[rowIdIdx] = true; + } if (LOG.isDebugEnabled()) { LOG.debug("Including the columns " + DebugUtils.toString(included)); } @@ -113,9 +122,11 @@ public abstract class BatchToRowReader<StructType, UnionType> protected abstract StructType createStructObject(Object previous, List<TypeInfo> childrenTypes); protected abstract void setStructCol(StructType structObj, int i, Object value); protected abstract Object getStructCol(StructType structObj, int i); + protected abstract int getStructLength(StructType structObj); protected abstract UnionType createUnionObject(List<TypeInfo> childrenTypes, Object previous); protected abstract void setUnion(UnionType unionObj, byte tag, Object object); protected abstract Object getUnionField(UnionType unionObj); + protected abstract void populateRecordIdentifier(StructType o); @Override public NullWritable createKey() { @@ -138,17 +149,26 @@ public abstract class BatchToRowReader<StructType, UnionType> } @Override + @SuppressWarnings("unchecked") public boolean next(NullWritable key, Object previous) throws IOException { if (!ensureBatch()) { return false; } - @SuppressWarnings("unchecked") - StructType value = (StructType)previous; + + if (this.rowIdIdx >= 0) { + populateRecordIdentifier(null); + } + + StructType value = (StructType) previous; for (int i = 0; i < schema.size(); ++i) { - if (!included[i]) continue; // TODO: shortcut for last col below length? + if (!included[i] || i >= getStructLength(value)) continue; try { setStructCol(value, i, nextValue(batch.cols[i], rowInBatch, schema.get(i), getStructCol(value, i))); + if (i == rowIdIdx) { + // Populate key + populateRecordIdentifier((StructType) getStructCol(value, i)); + } } catch (Throwable t) { LOG.error("Error at row " + rowInBatch + "/" + batch.size + ", column " + i + "/" + schema.size() + " " + batch.cols[i], t); http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java index c9ff592..bfd6eae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java @@ -19,24 +19,37 @@ package org.apache.hadoop.hive.ql.io.orc; import java.util.List; +import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordIdentifier.Field; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.RecordReader; /** BatchToRowReader that returns the rows readable by ORC IOs. */ -public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, OrcUnion> { +public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, OrcUnion> + implements AcidRecordReader<NullWritable, Object> { + + private final RecordIdentifier recordIdentifier; + private boolean isNull; + public OrcOiBatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> vrbReader, VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) { super(vrbReader, vrbCtx, includedCols); + this.recordIdentifier = new RecordIdentifier(); + this.isNull = true; } @Override protected OrcStruct createStructObject(Object previous, List<TypeInfo> childrenTypes) { int numChildren = childrenTypes.size(); - if (!(previous instanceof OrcStruct)) { + if (previous == null || !(previous instanceof OrcStruct)) { return new OrcStruct(numChildren); } OrcStruct result = (OrcStruct) previous; @@ -45,6 +58,11 @@ public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, OrcUnion> } @Override + protected int getStructLength(OrcStruct structObj) { + return structObj.getNumFields(); + } + + @Override protected OrcUnion createUnionObject(List<TypeInfo> childrenTypes, Object previous) { return (previous instanceof OrcUnion) ? (OrcUnion)previous : new OrcUnion(); } @@ -68,4 +86,22 @@ public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, OrcUnion> protected void setUnion(OrcUnion unionObj, byte tag, Object object) { unionObj.set(tag, object); } + + @Override + protected void populateRecordIdentifier(OrcStruct rowId) { + if (rowId == null) { + this.isNull = true; + return; + } + recordIdentifier.setValues(((LongWritable) rowId.getFieldValue(Field.writeId.ordinal())).get(), + ((IntWritable) rowId.getFieldValue(Field.bucketId.ordinal())).get(), + ((LongWritable) rowId.getFieldValue(Field.rowId.ordinal())).get()); + this.isNull = false; + } + + @Override + public RecordIdentifier getRecordIdentifier() { + return this.isNull ? null : recordIdentifier; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/test/queries/clientpositive/orc_llap_nonvector.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q index 4dfb259..95a0384 100644 --- a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q +++ b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q @@ -41,5 +41,16 @@ explain select cint, cstring1 from orc_llap_nonvector limit 1025; select cint, cstring1 from orc_llap_nonvector limit 1025; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +create table orc_llap_nonvector_2 stored as orc tblproperties('transactional'='true') as +select *, rand(1234) rdm from alltypesorc order by rdm; + +explain +select ROW__ID from orc_llap_nonvector_2 limit 10; +select ROW__ID from orc_llap_nonvector_2 limit 10; + DROP TABLE orc_create_staging_n3; DROP TABLE orc_llap_nonvector; +DROP TABLE orc_llap_nonvector_2; http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out b/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out index 364da93..c108be0 100644 --- a/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out @@ -1292,6 +1292,92 @@ NULL NULL 1053814436 By4JbbLm4g1Kyq67Er 528534767 cvLH6Eat2yFsyy7p NULL NULL +PREHOOK: query: create table orc_llap_nonvector_2 stored as orc tblproperties('transactional'='true') as +select *, rand(1234) rdm from alltypesorc order by rdm +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap_nonvector_2 +POSTHOOK: query: create table orc_llap_nonvector_2 stored as orc tblproperties('transactional'='true') as +select *, rand(1234) rdm from alltypesorc order by rdm +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap_nonvector_2 +POSTHOOK: Lineage: orc_llap_nonvector_2.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.rdm EXPRESSION [] +PREHOOK: query: explain +select ROW__ID from orc_llap_nonvector_2 limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select ROW__ID from orc_llap_nonvector_2 limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap_nonvector_2 + Statistics: Num rows: 12288 Data size: 4468050 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ROW__ID (type: struct<writeid:bigint,bucketid:int,rowid:bigint>) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 933888 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 760 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 760 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: may be used (ACID table) + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select ROW__ID from orc_llap_nonvector_2 limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_nonvector_2 +#### A masked pattern was here #### +POSTHOOK: query: select ROW__ID from orc_llap_nonvector_2 limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_nonvector_2 +#### A masked pattern was here #### +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":0} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":1} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":2} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":3} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":4} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":5} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":6} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":7} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":8} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":9} PREHOOK: query: DROP TABLE orc_create_staging_n3 PREHOOK: type: DROPTABLE PREHOOK: Input: default@orc_create_staging_n3 @@ -1308,3 +1394,11 @@ POSTHOOK: query: DROP TABLE orc_llap_nonvector POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@orc_llap_nonvector POSTHOOK: Output: default@orc_llap_nonvector +PREHOOK: query: DROP TABLE orc_llap_nonvector_2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap_nonvector_2 +PREHOOK: Output: default@orc_llap_nonvector_2 +POSTHOOK: query: DROP TABLE orc_llap_nonvector_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap_nonvector_2 +POSTHOOK: Output: default@orc_llap_nonvector_2