Repository: hive
Updated Branches:
  refs/heads/master ce2754daf -> 16225d204


HIVE-20302: LLAP: non-vectorized execution in IO ignores virtual columns, 
including ROW__ID (Jesus Camacho Rodriguez, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/16225d20
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/16225d20
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/16225d20

Branch: refs/heads/master
Commit: 16225d204bf5acdd7ac08b3f1d3b8c5634f006f2
Parents: ce2754d
Author: Jesus Camacho Rodriguez <jcama...@apache.org>
Authored: Fri Aug 3 15:22:16 2018 -0700
Committer: Jesus Camacho Rodriguez <jcama...@apache.org>
Committed: Fri Aug 3 19:01:11 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/BatchToRowReader.java     | 28 +++++-
 .../hive/ql/io/orc/OrcOiBatchToRowReader.java   | 40 ++++++++-
 .../queries/clientpositive/orc_llap_nonvector.q | 11 +++
 .../llap/orc_llap_nonvector.q.out               | 94 ++++++++++++++++++++
 4 files changed, 167 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
index 434a5b8..9d1d260 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.llap.DebugUtils;
 
 import java.util.Arrays;
 
-import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -69,6 +70,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.RecordReader;
 
 /**
@@ -89,6 +91,8 @@ public abstract class BatchToRowReader<StructType, UnionType>
   private final boolean[] included;
   private int rowInBatch = 0;
 
+  private final int rowIdIdx;
+
   public BatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> 
vrbReader,
       VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
     this.vrbReader = vrbReader;
@@ -104,6 +108,11 @@ public abstract class BatchToRowReader<StructType, 
UnionType>
     } else {
       Arrays.fill(included, true);
     }
+    // Create struct for ROW__ID virtual column and extract index
+    this.rowIdIdx = vrbCtx.findVirtualColumnNum(VirtualColumn.ROWID);
+    if (this.rowIdIdx >= 0) {
+      included[rowIdIdx] = true;
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Including the columns " + DebugUtils.toString(included));
     }
@@ -113,9 +122,11 @@ public abstract class BatchToRowReader<StructType, 
UnionType>
   protected abstract StructType createStructObject(Object previous, 
List<TypeInfo> childrenTypes);
   protected abstract void setStructCol(StructType structObj, int i, Object 
value);
   protected abstract Object getStructCol(StructType structObj, int i);
+  protected abstract int getStructLength(StructType structObj);
   protected abstract UnionType createUnionObject(List<TypeInfo> childrenTypes, 
Object previous);
   protected abstract void setUnion(UnionType unionObj, byte tag, Object 
object);
   protected abstract Object getUnionField(UnionType unionObj);
+  protected abstract void populateRecordIdentifier(StructType o);
 
   @Override
   public NullWritable createKey() {
@@ -138,17 +149,26 @@ public abstract class BatchToRowReader<StructType, 
UnionType>
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public boolean next(NullWritable key, Object previous) throws IOException {
     if (!ensureBatch()) {
       return false;
     }
-    @SuppressWarnings("unchecked")
-    StructType value = (StructType)previous;
+
+    if (this.rowIdIdx >= 0) {
+      populateRecordIdentifier(null);
+    }
+
+    StructType value = (StructType) previous;
     for (int i = 0; i < schema.size(); ++i) {
-      if (!included[i]) continue; // TODO: shortcut for last col below length?
+      if (!included[i] || i >= getStructLength(value)) continue;
       try {
         setStructCol(value, i,
             nextValue(batch.cols[i], rowInBatch, schema.get(i), 
getStructCol(value, i)));
+        if (i == rowIdIdx) {
+          // Populate key
+          populateRecordIdentifier((StructType) getStructCol(value, i));
+        }
       } catch (Throwable t) {
         LOG.error("Error at row " + rowInBatch + "/" + batch.size + ", column 
" + i
             + "/" + schema.size() + " " + batch.cols[i], t);

http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
index c9ff592..bfd6eae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java
@@ -19,24 +19,37 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier.Field;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.ql.io.BatchToRowReader;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.RecordReader;
 
 /** BatchToRowReader that returns the rows readable by ORC IOs. */
-public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, 
OrcUnion> {
+public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, 
OrcUnion>
+    implements AcidRecordReader<NullWritable, Object> {
+
+  private final RecordIdentifier recordIdentifier;
+  private boolean isNull;
+
   public OrcOiBatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> 
vrbReader,
       VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) {
     super(vrbReader, vrbCtx, includedCols);
+    this.recordIdentifier = new RecordIdentifier();
+    this.isNull = true;
   }
 
   @Override
   protected OrcStruct createStructObject(Object previous, List<TypeInfo> 
childrenTypes) {
     int numChildren = childrenTypes.size();
-    if (!(previous instanceof OrcStruct)) {
+    if (previous == null || !(previous instanceof OrcStruct)) {
       return new OrcStruct(numChildren);
     }
     OrcStruct result = (OrcStruct) previous;
@@ -45,6 +58,11 @@ public class OrcOiBatchToRowReader extends 
BatchToRowReader<OrcStruct, OrcUnion>
   }
 
   @Override
+  protected int getStructLength(OrcStruct structObj) {
+    return structObj.getNumFields();
+  }
+
+  @Override
   protected OrcUnion createUnionObject(List<TypeInfo> childrenTypes, Object 
previous) {
     return (previous instanceof OrcUnion) ? (OrcUnion)previous : new 
OrcUnion();
   }
@@ -68,4 +86,22 @@ public class OrcOiBatchToRowReader extends 
BatchToRowReader<OrcStruct, OrcUnion>
   protected void setUnion(OrcUnion unionObj, byte tag, Object object) {
     unionObj.set(tag, object);
   }
+
+  @Override
+  protected void populateRecordIdentifier(OrcStruct rowId) {
+    if (rowId == null) {
+      this.isNull = true;
+      return;
+    }
+    recordIdentifier.setValues(((LongWritable) 
rowId.getFieldValue(Field.writeId.ordinal())).get(),
+        ((IntWritable) rowId.getFieldValue(Field.bucketId.ordinal())).get(),
+        ((LongWritable) rowId.getFieldValue(Field.rowId.ordinal())).get());
+    this.isNull = false;
+  }
+
+  @Override
+  public RecordIdentifier getRecordIdentifier() {
+    return this.isNull ? null : recordIdentifier;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q 
b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
index 4dfb259..95a0384 100644
--- a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
+++ b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q
@@ -41,5 +41,16 @@ explain
 select cint, cstring1 from orc_llap_nonvector limit 1025;
 select cint, cstring1 from orc_llap_nonvector limit 1025;
 
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+create table orc_llap_nonvector_2 stored as orc 
tblproperties('transactional'='true') as
+select *, rand(1234) rdm from alltypesorc order by rdm;
+
+explain
+select ROW__ID from orc_llap_nonvector_2 limit 10;
+select ROW__ID from orc_llap_nonvector_2 limit 10;
+
 DROP TABLE orc_create_staging_n3;
 DROP TABLE orc_llap_nonvector;
+DROP TABLE orc_llap_nonvector_2;

http://git-wip-us.apache.org/repos/asf/hive/blob/16225d20/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out 
b/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out
index 364da93..c108be0 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out
@@ -1292,6 +1292,92 @@ NULL     NULL
 1053814436     By4JbbLm4g1Kyq67Er
 528534767      cvLH6Eat2yFsyy7p
 NULL   NULL
+PREHOOK: query: create table orc_llap_nonvector_2 stored as orc 
tblproperties('transactional'='true') as
+select *, rand(1234) rdm from alltypesorc order by rdm
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: database:default
+PREHOOK: Output: default@orc_llap_nonvector_2
+POSTHOOK: query: create table orc_llap_nonvector_2 stored as orc 
tblproperties('transactional'='true') as
+select *, rand(1234) rdm from alltypesorc order by rdm
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@alltypesorc
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@orc_llap_nonvector_2
+POSTHOOK: Lineage: orc_llap_nonvector_2.cbigint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), 
]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cboolean1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cboolean2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cdouble SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), 
]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cfloat SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.csmallint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cstring1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.cstring2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.ctimestamp1 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.ctimestamp2 SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.ctinyint SIMPLE 
[(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, 
comment:null), ]
+POSTHOOK: Lineage: orc_llap_nonvector_2.rdm EXPRESSION []
+PREHOOK: query: explain
+select ROW__ID from orc_llap_nonvector_2 limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select ROW__ID from orc_llap_nonvector_2 limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: orc_llap_nonvector_2
+                  Statistics: Num rows: 12288 Data size: 4468050 Basic stats: 
COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: ROW__ID (type: 
struct<writeid:bigint,bucketid:int,rowid:bigint>)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 12288 Data size: 933888 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Limit
+                      Number of rows: 10
+                      Statistics: Num rows: 10 Data size: 760 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 10 Data size: 760 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 10
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select ROW__ID from orc_llap_nonvector_2 limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_llap_nonvector_2
+#### A masked pattern was here ####
+POSTHOOK: query: select ROW__ID from orc_llap_nonvector_2 limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_llap_nonvector_2
+#### A masked pattern was here ####
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":0}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":1}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":2}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":3}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":4}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":5}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":6}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":7}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":8}
+{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":9}
 PREHOOK: query: DROP TABLE orc_create_staging_n3
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@orc_create_staging_n3
@@ -1308,3 +1394,11 @@ POSTHOOK: query: DROP TABLE orc_llap_nonvector
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@orc_llap_nonvector
 POSTHOOK: Output: default@orc_llap_nonvector
+PREHOOK: query: DROP TABLE orc_llap_nonvector_2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@orc_llap_nonvector_2
+PREHOOK: Output: default@orc_llap_nonvector_2
+POSTHOOK: query: DROP TABLE orc_llap_nonvector_2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@orc_llap_nonvector_2
+POSTHOOK: Output: default@orc_llap_nonvector_2

Reply via email to