HIVE-16369: Vectorization: Support PTF (Part 1: No Custom Window Framing -- 
Default Only) (Matt McCline, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a0df0ace
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a0df0ace
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a0df0ace

Branch: refs/heads/master
Commit: a0df0ace2ac179329f39c43ad0b67da9ba3ba95b
Parents: fed915c
Author: Matt McCline <mmccl...@hortonworks.com>
Authored: Thu Jul 20 05:16:08 2017 -0500
Committer: Matt McCline <mmccl...@hortonworks.com>
Committed: Thu Jul 20 05:16:08 2017 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |     4 +
 .../apache/hadoop/hive/ql/exec/Operator.java    |     7 +
 .../hadoop/hive/ql/exec/OperatorFactory.java    |     3 +
 .../ql/exec/spark/SparkReduceRecordHandler.java |    16 +-
 .../hive/ql/exec/tez/ReduceRecordSource.java    |    62 +-
 .../ql/exec/vector/VectorGroupByOperator.java   |    61 +-
 .../ql/exec/vector/VectorSelectOperator.java    |     8 +
 .../ql/exec/vector/VectorizationContext.java    |     9 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     |    61 +-
 .../exec/vector/ptf/VectorPTFEvaluatorBase.java |   122 +
 .../vector/ptf/VectorPTFEvaluatorCount.java     |   108 +
 .../vector/ptf/VectorPTFEvaluatorCountStar.java |    79 +
 .../ptf/VectorPTFEvaluatorDecimalAvg.java       |   162 +
 .../VectorPTFEvaluatorDecimalFirstValue.java    |   117 +
 .../ptf/VectorPTFEvaluatorDecimalLastValue.java |   113 +
 .../ptf/VectorPTFEvaluatorDecimalMax.java       |   146 +
 .../ptf/VectorPTFEvaluatorDecimalMin.java       |   146 +
 .../ptf/VectorPTFEvaluatorDecimalSum.java       |   140 +
 .../vector/ptf/VectorPTFEvaluatorDenseRank.java |    77 +
 .../vector/ptf/VectorPTFEvaluatorDoubleAvg.java |   153 +
 .../ptf/VectorPTFEvaluatorDoubleFirstValue.java |   113 +
 .../ptf/VectorPTFEvaluatorDoubleLastValue.java  |   109 +
 .../vector/ptf/VectorPTFEvaluatorDoubleMax.java |   136 +
 .../vector/ptf/VectorPTFEvaluatorDoubleMin.java |   136 +
 .../vector/ptf/VectorPTFEvaluatorDoubleSum.java |   134 +
 .../vector/ptf/VectorPTFEvaluatorLongAvg.java   |   153 +
 .../ptf/VectorPTFEvaluatorLongFirstValue.java   |   113 +
 .../ptf/VectorPTFEvaluatorLongLastValue.java    |   109 +
 .../vector/ptf/VectorPTFEvaluatorLongMax.java   |   136 +
 .../vector/ptf/VectorPTFEvaluatorLongMin.java   |   136 +
 .../vector/ptf/VectorPTFEvaluatorLongSum.java   |   134 +
 .../exec/vector/ptf/VectorPTFEvaluatorRank.java |    81 +
 .../vector/ptf/VectorPTFEvaluatorRowNumber.java |    77 +
 .../exec/vector/ptf/VectorPTFGroupBatches.java  |   216 +
 .../ql/exec/vector/ptf/VectorPTFOperator.java   |   570 +
 .../hive/ql/optimizer/physical/Vectorizer.java  |   522 +-
 .../org/apache/hadoop/hive/ql/plan/PTFDesc.java |   112 +
 .../hadoop/hive/ql/plan/VectorPTFDesc.java      |   360 +
 .../hadoop/hive/ql/plan/VectorPTFInfo.java      |   161 +
 .../hadoop/hive/ql/plan/ptf/WindowFrameDef.java |     2 +-
 .../hive/ql/udf/generic/GenericUDFLeadLag.java  |     4 +-
 .../clientpositive/vector_groupby_cube1.q       |    17 +-
 .../vector_groupby_grouping_id1.q               |    21 +
 .../vector_groupby_grouping_id2.q               |    52 +
 .../vector_groupby_grouping_window.q            |     1 +
 .../vector_outer_reference_windowed.q           |    91 +
 .../clientpositive/vector_ptf_part_simple.q     |   240 +-
 .../queries/clientpositive/vector_windowing.q   |   791 ++
 .../vector_windowing_expressions.q              |    94 +
 .../clientpositive/vector_windowing_gby.q       |    21 +
 .../clientpositive/vector_windowing_gby2.q      |    48 +
 .../vector_windowing_multipartitioning.q        |    73 +
 .../clientpositive/vector_windowing_navfn.q     |     2 +
 .../vector_windowing_order_null.q               |    58 +
 .../vector_windowing_range_multiorder.q         |    68 +
 .../clientpositive/vector_windowing_rank.q      |   117 +
 .../clientpositive/vector_windowing_streaming.q |    85 +
 .../vector_windowing_windowspec.q               |    70 +
 .../vector_windowing_windowspec4.q              |    37 +
 .../queries/clientpositive/vectorized_ptf.q     |     1 +
 .../clientpositive/correlationoptimizer12.q.out |     4 +-
 .../results/clientpositive/ctas_colname.q.out   |     4 +-
 .../clientpositive/distinct_windowing.q.out     |     8 +-
 .../distinct_windowing_no_cbo.q.out             |    12 +-
 .../groupby_grouping_window.q.out               |     2 +-
 .../llap/groupby_resolution.q.out               |     2 +-
 .../test/results/clientpositive/llap/ptf.q.out  |   114 +-
 .../clientpositive/llap/ptf_streaming.q.out     |    62 +-
 .../clientpositive/llap/subquery_in.q.out       |     4 +-
 .../clientpositive/llap/subquery_notin.q.out    |    14 +-
 .../clientpositive/llap/subquery_scalar.q.out   |     4 +-
 .../llap/vector_groupby_cube1.q.out             |   176 +-
 .../llap/vector_groupby_grouping_id1.q.out      |   800 ++
 .../llap/vector_groupby_grouping_id2.q.out      |  2117 +++
 .../llap/vector_groupby_grouping_window.q.out   |     4 +-
 .../llap/vector_outer_reference_windowed.q.out  |  1435 ++
 .../llap/vector_ptf_part_simple.q.out           |  3880 ++++-
 .../llap/vector_tablesample_rows.q.out          |    68 +-
 .../clientpositive/llap/vector_windowing.q.out  |  9768 +++++++++++++
 .../llap/vector_windowing_expressions.q.out     |  2022 +++
 .../llap/vector_windowing_gby.q.out             |   314 +
 .../llap/vector_windowing_gby2.q.out            |  1158 ++
 .../vector_windowing_multipartitioning.q.out    | 11587 +++++++++++++++
 .../llap/vector_windowing_navfn.q.out           |   218 +-
 .../llap/vector_windowing_order_null.q.out      |  1232 ++
 .../vector_windowing_range_multiorder.q.out     | 12584 +++++++++++++++++
 .../llap/vector_windowing_rank.q.out            |  1860 +++
 .../llap/vector_windowing_streaming.q.out       |  1036 ++
 .../llap/vector_windowing_windowspec.q.out      |  2374 ++++
 .../llap/vector_windowing_windowspec4.q.out     |   223 +
 .../llap/vectorization_limit.q.out              |    24 +-
 .../llap/vectorized_mapjoin2.q.out              |     4 +-
 .../clientpositive/llap/vectorized_ptf.q.out    |   294 +-
 .../results/clientpositive/llap/windowing.q.out |     2 +-
 .../llap/windowing_windowspec.q.out             |   955 ++
 .../outer_reference_windowed.q.out              |    10 +-
 ql/src/test/results/clientpositive/pcs.q.out    |     2 +-
 .../results/clientpositive/ppd_windowing1.q.out |    66 +-
 .../results/clientpositive/ptfgroupbyjoin.q.out |     2 +-
 .../results/clientpositive/quotedid_basic.q.out |     4 +-
 .../test/results/clientpositive/semijoin2.q.out |     4 +-
 .../test/results/clientpositive/semijoin4.q.out |     2 +-
 .../test/results/clientpositive/semijoin5.q.out |     4 +-
 .../spark/groupby_resolution.q.out              |     2 +-
 .../test/results/clientpositive/spark/ptf.q.out |   114 +-
 .../clientpositive/spark/ptf_streaming.q.out    |    62 +-
 .../clientpositive/spark/subquery_in.q.out      |     4 +-
 .../spark/union_remove_6_subq.q.out             |     2 +-
 .../clientpositive/spark/vectorized_ptf.q.out   |   294 +-
 .../clientpositive/spark/windowing.q.out        |     2 +-
 .../clientpositive/subquery_in_having.q.out     |     2 +-
 .../subquery_unqualcolumnrefs.q.out             |     4 +-
 .../clientpositive/union_remove_6_subq.q.out    |     2 +-
 .../vector_outer_reference_windowed.q.out       |  1221 ++
 .../clientpositive/vector_ptf_part_simple.q.out |  2895 ++++
 .../clientpositive/vector_windowing.q.out       |  9149 ++++++++++++
 .../vector_windowing_expressions.q.out          |  1822 +++
 .../clientpositive/vector_windowing_gby.q.out   |   252 +
 .../clientpositive/vector_windowing_gby2.q.out  |  1015 ++
 .../vector_windowing_multipartitioning.q.out    | 11288 +++++++++++++++
 .../vector_windowing_order_null.q.out           |   989 ++
 .../vector_windowing_range_multiorder.q.out     | 12239 ++++++++++++++++
 .../clientpositive/vector_windowing_rank.q.out  |  1543 ++
 .../vector_windowing_streaming.q.out            |   836 ++
 .../vector_windowing_windowspec.q.out           |  2060 +++
 .../vector_windowing_windowspec4.q.out          |   211 +
 .../results/clientpositive/windowing_gby2.q.out |    12 +-
 .../clientpositive/windowing_navfn.q.out        |     2 +-
 .../clientpositive/windowing_streaming.q.out    |     6 +-
 129 files changed, 106272 insertions(+), 1210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9c954be..69205bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2839,6 +2839,10 @@ public class HiveConf extends Configuration {
         "1. chosen : use VectorUDFAdaptor for a small set of UDFs that were 
choosen for good performance\n" +
         "2. all    : use VectorUDFAdaptor for all UDFs"
     ),
+    HIVE_VECTORIZATION_PTF_ENABLED("hive.vectorized.execution.ptf.enabled", 
false,
+        "This flag should be set to true to enable vectorized mode of the PTF 
of query execution.\n" +
+        "The default value is false."),
+
     
HIVE_VECTORIZATION_COMPLEX_TYPES_ENABLED("hive.vectorized.complex.types.enabled",
 true,
         "This flag should be set to true to enable vectorization\n" +
         "of expressions with complex types.\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 7f646c4..66c34aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -637,6 +637,13 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
     defaultEndGroup();
   }
 
+  // Tell the operator the status of the next key-grouped VectorizedRowBatch 
that will be delivered
+  // to the process method.  E.g. by reduce-shuffle.  These semantics are 
needed by PTF so it can
+  // efficiently add computed values to the last batch of a group key.
+  public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+    // Do nothing.
+  }
+
   // an blocking operator (e.g. GroupByOperator and JoinOperator) can
   // override this method to forward its outputs
   public void flush() throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index af1fa66..993da83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -38,6 +38,8 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
 import 
org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import 
org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
+import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
@@ -138,6 +140,7 @@ public final class OperatorFactory {
     vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class);
     vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class);
     vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class);
+    vectorOpvec.put(PTFDesc.class, VectorPTFOperator.class);
     vectorOpvec.put(SparkHashTableSinkDesc.class, 
VectorSparkHashTableSinkOperator.class);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 36158a1..7c1164b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -442,13 +442,13 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
           // Flush current group batch as last batch of group.
           if (batch.size > 0) {
 
+            // Indicate last batch of current group.
+            reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true);
+
             // Forward; reset key and value columns.
             forwardBatch(/* resetValueColumnsOnly */ false);
-            reducer.endGroup();
           }
 
-          reducer.startGroup();
-
           // Deserialize group key into vector row columns.
           byte[] keyBytes = keyWritable.getBytes();
           int keyLength = keyWritable.getLength();
@@ -475,6 +475,9 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
         if (batch.size >= batch.getMaxSize() ||
             batch.size > 0 && batchBytes >= BATCH_BYTES) {
 
+          // We have a row for current group, so we indicate not the last 
batch.
+          reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ false);
+
           // Batch is full or using too much space.
           forwardBatch(/* resetValueColumnsOnly */ true);
         }
@@ -581,10 +584,13 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
     try {
       if (vectorized) {
         if (batch.size > 0) {
-          forwardBatch(/* resetValueColumnsOnly */ false);
+
           if (handleGroupKey) {
-            reducer.endGroup();
+            // Indicate last batch of current group.
+            reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true);
           }
+
+          forwardBatch(/* resetValueColumnsOnly */ false);
         }
       } else {
         if (groupKey != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 43f9db3..bdde81a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -369,20 +369,6 @@ public class ReduceRecordSource implements RecordSource {
       BytesWritable keyWritable = (BytesWritable) reader.getCurrentKey();
       valueWritables = reader.getCurrentValues();
 
-      // Check if this is a new group or same group
-      if (handleGroupKey && !keyWritable.equals(this.groupKey)) {
-        // If a operator wants to do some work at the beginning of a group
-        if (groupKey == null) { // the first group
-          this.groupKey = new BytesWritable();
-        } else {
-          // If a operator wants to do some work at the end of a group
-          reducer.endGroup();
-        }
-
-        groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength());
-        reducer.startGroup();
-      }
-
       processVectorGroup(keyWritable, valueWritables, tag);
       return true;
     } catch (Throwable e) {
@@ -398,15 +384,20 @@ public class ReduceRecordSource implements RecordSource {
   }
 
   /**
+   * 
+   * @param keyWritable
    * @param values
-   * @return true if it is not done and can take more inputs
+   * @param tag
+   * @throws HiveException
+   * @throws IOException
    */
   private void processVectorGroup(BytesWritable keyWritable,
           Iterable<Object> values, byte tag) throws HiveException, IOException 
{
 
+    Preconditions.checkState(batch.size == 0);
+
     // Deserialize key into vector row columns.
-    // Since we referencing byte column vector byte arrays by reference, we 
don't need
-    // a data buffer.
+    //
     byte[] keyBytes = keyWritable.getBytes();
     int keyLength = keyWritable.getLength();
 
@@ -432,21 +423,14 @@ public class ReduceRecordSource implements RecordSource {
     int batchBytes = keyBytes.length;
     try {
       for (Object value : values) {
-        if (valueLazyBinaryDeserializeToRow != null) {
-          // Deserialize value into vector row columns.
-          BytesWritable valueWritable = (BytesWritable) value;
-          byte[] valueBytes = valueWritable.getBytes();
-          int valueLength = valueWritable.getLength();
-          batchBytes += valueLength;
-
-          valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength);
-          valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx);
-        }
-        rowIdx++;
-        if (rowIdx >= maxSize || batchBytes >= BATCH_BYTES) {
+        if (rowIdx >= maxSize ||
+            (rowIdx > 0 && batchBytes >= BATCH_BYTES)) {
 
-          // Batch is full.
+          // Batch is full AND we have at least 1 more row...
           batch.size = rowIdx;
+          if (handleGroupKey) {
+            reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ 
false);
+          }
           reducer.process(batch, tag);
 
           // Reset just the value columns and value buffer.
@@ -455,12 +439,26 @@ public class ReduceRecordSource implements RecordSource {
             batch.cols[i].reset();
           }
           rowIdx = 0;
-          batchBytes = 0;
+          batchBytes = keyBytes.length;
         }
+        if (valueLazyBinaryDeserializeToRow != null) {
+          // Deserialize value into vector row columns.
+          BytesWritable valueWritable = (BytesWritable) value;
+          byte[] valueBytes = valueWritable.getBytes();
+          int valueLength = valueWritable.getLength();
+          batchBytes += valueLength;
+
+          valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength);
+          valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx);
+        }
+        rowIdx++;
       }
       if (rowIdx > 0) {
         // Flush final partial batch.
-        VectorizedBatchUtil.setBatchSize(batch, rowIdx);
+        batch.size = rowIdx;
+        if (handleGroupKey) {
+          reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true);
+        }
         reducer.process(batch, tag);
       }
       batch.reset();

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 613a31a..31f2621 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -148,8 +148,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
    */
   private static interface IProcessingMode {
     public void initialize(Configuration hconf) throws HiveException;
-    public void startGroup() throws HiveException;
-    public void endGroup() throws HiveException;
+    public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException;
     public void processBatch(VectorizedRowBatch batch) throws HiveException;
     public void close(boolean aborted) throws HiveException;
   }
@@ -159,14 +158,10 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
    */
   private abstract class ProcessingModeBase implements IProcessingMode {
 
-    // Overridden and used in sorted reduce group batch processing mode.
+    // Overridden and used in ProcessingModeReduceMergePartial mode.
     @Override
-    public void startGroup() throws HiveException {
-      // Do nothing.
-    }
-    @Override
-    public void endGroup() throws HiveException {
-      // Do nothing.
+    public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+      // Some Spark plans cause Hash and other modes to get this.  So, ignore 
it.
     }
 
     protected abstract void doProcessBatch(VectorizedRowBatch batch, boolean 
isFirstGroupingSet,
@@ -258,6 +253,11 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
     }
 
     @Override
+    public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+      // Do nothing.
+    }
+
+    @Override
     public void doProcessBatch(VectorizedRowBatch batch, boolean 
isFirstGroupingSet,
         boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
       for (int i = 0; i < aggregators.length; ++i) {
@@ -682,6 +682,11 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
     }
 
     @Override
+    public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+      // Do nothing.
+    }
+
+    @Override
     public void doProcessBatch(VectorizedRowBatch batch, boolean 
isFirstGroupingSet,
         boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
 
@@ -770,8 +775,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
    */
   private class ProcessingModeReduceMergePartial extends ProcessingModeBase {
 
-    private boolean inGroup;
     private boolean first;
+    private boolean isLastGroupBatch;
 
     /**
      * The group vector key helper.
@@ -790,7 +795,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
 
     @Override
     public void initialize(Configuration hconf) throws HiveException {
-      inGroup = false;
+      isLastGroupBatch = true;
 
       // We do not include the dummy grouping set column in the output.  So we 
pass outputKeyLength
       // instead of keyExpressions.length
@@ -802,24 +807,18 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
     }
 
     @Override
-    public void startGroup() throws HiveException {
-      inGroup = true;
-      first = true;
-    }
-
-    @Override
-    public void endGroup() throws HiveException {
-      if (inGroup && !first) {
-        writeGroupRow(groupAggregators, buffer);
-        groupAggregators.reset();
+    public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+      if (this.isLastGroupBatch) {
+        // Previous batch was the last of a group of batches.  Remember the 
next is the first batch
+        // of a new group of batches.
+        first = true;
       }
-      inGroup = false;
+      this.isLastGroupBatch = isLastGroupBatch;
     }
 
     @Override
     public void doProcessBatch(VectorizedRowBatch batch, boolean 
isFirstGroupingSet,
         boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
-      assert(inGroup);
       if (first) {
         // Copy the group key to output batch now.  We'll copy in the 
aggregates at the end of the group.
         first = false;
@@ -836,11 +835,16 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
       for (int i = 0; i < aggregators.length; ++i) {
         
aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch);
       }
+
+      if (isLastGroupBatch) {
+        writeGroupRow(groupAggregators, buffer);
+        groupAggregators.reset();
+      }
     }
 
     @Override
     public void close(boolean aborted) throws HiveException {
-      if (!aborted && inGroup && !first) {
+      if (!aborted && !first && !isLastGroupBatch) {
         writeGroupRow(groupAggregators, buffer);
       }
     }
@@ -1013,21 +1017,26 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc> implements
   }
 
   @Override
+  public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+    processingMode.setNextVectorBatchGroupStatus(isLastGroupBatch);
+  }
+
+  @Override
   public void startGroup() throws HiveException {
-    processingMode.startGroup();
 
     // We do not call startGroup on operators below because we are batching 
rows in
     // an output batch and the semantics will not work.
     // super.startGroup();
+    throw new HiveException("Unexpected startGroup");
   }
 
   @Override
   public void endGroup() throws HiveException {
-    processingMode.endGroup();
 
     // We do not call endGroup on operators below because we are batching rows 
in
     // an output batch and the semantics will not work.
     // super.endGroup();
+    throw new HiveException("Unexpected startGroup");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index 17ccf21..5f1f952 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -110,6 +110,14 @@ public class VectorSelectOperator extends 
Operator<SelectDesc> implements
         outputFieldNames, objectInspectors);
   }
 
+  // Must send on to VectorPTFOperator...
+  @Override
+  public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException {
+    for (Operator<? extends OperatorDesc> op : childOperators) {
+      op.setNextVectorBatchGroupStatus(isLastGroupBatch);
+    }
+  }
+
   @Override
   public void process(Object row, int tag) throws HiveException {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 503bd0c..9e026f0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -432,7 +432,7 @@ public class VectorizationContext {
     return udfsNeedingImplicitDecimalCast.contains(udfClass);
   }
 
-  protected int getInputColumnIndex(String name) throws HiveException {
+  public int getInputColumnIndex(String name) throws HiveException {
     if (name == null) {
       throw new HiveException("Null column name");
     }
@@ -464,7 +464,7 @@ public class VectorizationContext {
 
     private final Set<Integer> usedOutputColumns = new HashSet<Integer>();
 
-    int allocateOutputColumn(TypeInfo typeInfo) throws HiveException {
+    int allocateOutputColumn(TypeInfo typeInfo) {
         if (initialOutputCol < 0) {
           // This is a test calling.
           return 0;
@@ -525,7 +525,7 @@ public class VectorizationContext {
     }
   }
 
-  public int allocateScratchColumn(TypeInfo typeInfo) throws HiveException {
+  public int allocateScratchColumn(TypeInfo typeInfo) {
     return ocm.allocateOutputColumn(typeInfo);
   }
 
@@ -2672,8 +2672,7 @@ public class VectorizationContext {
     }
   }
 
-  static String getScratchName(TypeInfo typeInfo) throws HiveException {
-
+  static String getScratchName(TypeInfo typeInfo) {
     // For now, leave DECIMAL precision/scale in the name so 
DecimalColumnVector scratch columns
     // don't need their precision/scale adjusted...
     if (typeInfo.getCategory() == Category.PRIMITIVE &&

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 990e896..03c09e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -579,7 +579,7 @@ public class VectorizedBatchUtil {
     return typeInfoList.toArray(new TypeInfo[0]);
   }
 
-  static ColumnVector cloneColumnVector(ColumnVector source
+  public static ColumnVector makeLikeColumnVector(ColumnVector source
                                         ) throws HiveException{
     if (source instanceof LongColumnVector) {
       return new LongColumnVector(((LongColumnVector) source).vector.length);
@@ -598,25 +598,25 @@ public class VectorizedBatchUtil {
       return new IntervalDayTimeColumnVector(((IntervalDayTimeColumnVector) 
source).getLength());
     } else if (source instanceof ListColumnVector) {
       ListColumnVector src = (ListColumnVector) source;
-      ColumnVector child = cloneColumnVector(src.child);
+      ColumnVector child = makeLikeColumnVector(src.child);
       return new ListColumnVector(src.offsets.length, child);
     } else if (source instanceof MapColumnVector) {
       MapColumnVector src = (MapColumnVector) source;
-      ColumnVector keys = cloneColumnVector(src.keys);
-      ColumnVector values = cloneColumnVector(src.values);
+      ColumnVector keys = makeLikeColumnVector(src.keys);
+      ColumnVector values = makeLikeColumnVector(src.values);
       return new MapColumnVector(src.offsets.length, keys, values);
     } else if (source instanceof StructColumnVector) {
       StructColumnVector src = (StructColumnVector) source;
       ColumnVector[] copy = new ColumnVector[src.fields.length];
       for(int i=0; i < copy.length; ++i) {
-        copy[i] = cloneColumnVector(src.fields[i]);
+        copy[i] = makeLikeColumnVector(src.fields[i]);
       }
       return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, copy);
     } else if (source instanceof UnionColumnVector) {
       UnionColumnVector src = (UnionColumnVector) source;
       ColumnVector[] copy = new ColumnVector[src.fields.length];
       for(int i=0; i < copy.length; ++i) {
-        copy[i] = cloneColumnVector(src.fields[i]);
+        copy[i] = makeLikeColumnVector(src.fields[i]);
       }
       return new UnionColumnVector(src.tags.length, copy);
     } else
@@ -625,6 +625,53 @@ public class VectorizedBatchUtil {
           " is not supported!");
   }
 
+  public static void swapColumnVector(
+      VectorizedRowBatch batch1, int batch1ColumnNum,
+      VectorizedRowBatch batch2, int batch2ColumnNum) {
+    ColumnVector colVector1 = batch1.cols[batch1ColumnNum];
+    batch1.cols[batch1ColumnNum] = batch2.cols[batch2ColumnNum];
+    batch2.cols[batch2ColumnNum] = colVector1;
+  }
+
+  public static void copyRepeatingColumn(VectorizedRowBatch sourceBatch, int 
sourceColumnNum,
+      VectorizedRowBatch targetBatch, int targetColumnNum, boolean setByValue) 
{
+    ColumnVector sourceColVector = sourceBatch.cols[sourceColumnNum];
+    ColumnVector targetColVector = targetBatch.cols[targetColumnNum];
+
+    targetColVector.isRepeating = true;
+
+    if (!sourceColVector.noNulls) {
+      targetColVector.noNulls = false;
+      targetColVector.isNull[0] = true;
+      return;
+    }
+
+    if (sourceColVector instanceof LongColumnVector) {
+      ((LongColumnVector) targetColVector).vector[0] = ((LongColumnVector) 
sourceColVector).vector[0];
+    } else if (sourceColVector instanceof DoubleColumnVector) {
+      ((DoubleColumnVector) targetColVector).vector[0] = ((DoubleColumnVector) 
sourceColVector).vector[0];
+    } else if (sourceColVector instanceof BytesColumnVector) {
+      BytesColumnVector bytesColVector = (BytesColumnVector) sourceColVector;
+      byte[] bytes = bytesColVector.vector[0];
+      final int start = bytesColVector.start[0];
+      final int length = bytesColVector.length[0];
+      if (setByValue) {
+        ((BytesColumnVector) targetColVector).setVal(0, bytes, start, length);
+      } else {
+        ((BytesColumnVector) targetColVector).setRef(0, bytes, start, length);
+      }
+    } else if (sourceColVector instanceof DecimalColumnVector) {
+      ((DecimalColumnVector) targetColVector).set(0, ((DecimalColumnVector) 
sourceColVector).vector[0]);
+    } else if (sourceColVector instanceof TimestampColumnVector) {
+      ((TimestampColumnVector) targetColVector).set(0, 
((TimestampColumnVector) sourceColVector).asScratchTimestamp(0));
+    } else if (sourceColVector instanceof IntervalDayTimeColumnVector) {
+      ((IntervalDayTimeColumnVector) targetColVector).set(0, 
((IntervalDayTimeColumnVector) sourceColVector).asScratchIntervalDayTime(0));
+    } else {
+      throw new RuntimeException("Column vector class " + 
sourceColVector.getClass().getName() +
+          " is not supported!");
+    }
+  }
+
   /**
    * Make a new (scratch) batch, which is exactly "like" the batch provided, 
except that it's empty
    * @param batch the batch to imitate
@@ -635,7 +682,7 @@ public class VectorizedBatchUtil {
     VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols);
     for (int i = 0; i < batch.numCols; i++) {
       if (batch.cols[i] != null) {
-        newBatch.cols[i] = cloneColumnVector(batch.cols[i]);
+        newBatch.cols[i] = makeLikeColumnVector(batch.cols[i]);
         newBatch.cols[i].init();
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
new file mode 100644
index 0000000..beca5f9
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+/**
+ * This is the vector PTF evaluator base class.  An evaluator does the group 
batch aggregation work
+ * on an aggregation's 0 or 1 argument(s) and at some point will fill in an 
output column with the
+ * aggregation result.  The aggregation argument is an input column or 
expression, or no argument.
+ *
+ * When the aggregation is streaming (e.g. row_number, rank, first_value, 
etc), the output column
+ * can be filled in immediately by the implementation of evaluateGroupBatch.
+ *
+ * For non-streaming aggregations, the aggregation result is not known until 
the last group batch
+ * is processed.  After the last group batch has been processed, the 
VectorPTFGroupBatches class
+ * will call the isGroupResultNull, getResultColumnVectorType, 
getLongGroupResult |
+ * getDoubleGroupResult | getDecimalGroupResult, and getOutputColumnNum 
methods to get aggregation
+ * result information necessary to write it into the output column (as a 
repeated column) of all
+ * the group batches.
+ */
+public abstract class VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorBase.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected final WindowFrameDef windowFrameDef;
+  private final VectorExpression inputVecExpr;
+  protected final int inputColumnNum;
+  protected final int outputColumnNum;
+
+  public VectorPTFEvaluatorBase(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    this.windowFrameDef = windowFrameDef;
+    if (inputVecExpr == null) {
+      inputColumnNum = -1;
+      this.inputVecExpr = null;
+    } else {
+      inputColumnNum = inputVecExpr.getOutputColumn();
+      if (inputVecExpr instanceof IdentityExpression) {
+        this.inputVecExpr = null;
+      } else {
+        this.inputVecExpr = inputVecExpr;
+      }
+    }
+    this.outputColumnNum = outputColumnNum;
+  }
+
+  // Evaluate the aggregation input argument expression.
+  public void evaluateInputExpr(VectorizedRowBatch batch) {
+    if (inputVecExpr != null) {
+      inputVecExpr.evaluate(batch);
+    }
+  }
+
+  // Evaluate the aggregation over one of the group's batches.
+  public abstract void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch);
+
+  // Returns true if the aggregation result will be streamed.
+  public boolean streamsResult() {
+    // Assume it is not streamjng by default.
+    return false;
+  }
+
+  public int getOutputColumnNum() {
+    return outputColumnNum;
+  }
+
+  // After processing all the group's batches with evaluateGroupBatch, is the 
non-streaming
+  // aggregation result null?
+  public boolean isGroupResultNull() {
+    return false;
+  }
+
+  // What is the ColumnVector type of the aggregation result?
+  public abstract Type getResultColumnVectorType();
+
+  /*
+   * After processing all the non-streaming group's batches with 
evaluateGroupBatch and
+   * isGroupResultNull is false, the aggregation result value (based on 
getResultColumnVectorType).
+   */
+
+  public long getLongGroupResult() {
+    throw new RuntimeException("No long group result evaluator implementation 
" + this.getClass().getName());
+  }
+
+  public double getDoubleGroupResult() {
+    throw new RuntimeException("No double group result evaluator 
implementation " + this.getClass().getName());
+  }
+
+  public HiveDecimalWritable getDecimalGroupResult() {
+    throw new RuntimeException("No decimal group result evaluator 
implementation " + this.getClass().getName());
+  }
+
+  // Resets the aggregation calculation variable(s).
+  public abstract void resetEvaluator();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
new file mode 100644
index 0000000..638fc9e
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates count(column) for a PTF group.
+ *
+ * Count any rows of the group where the input column/expression is non-null.
+ */
+public class VectorPTFEvaluatorCount extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorCount.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected long count;
+
+  public VectorPTFEvaluatorCount(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Count non-null column rows.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    ColumnVector colVector = batch.cols[inputColumnNum];
+    if (colVector.isRepeating) {
+      if (colVector.noNulls) {
+        count += size;
+      }
+    } else if (colVector.noNulls) {
+      count += size;
+    } else {
+      boolean[] batchIsNull = colVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (++i >= size) {
+          return;
+        }
+      }
+      long varCount = 1;
+      i++;
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          varCount++;
+        }
+      }
+      count += varCount;
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return false;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.LONG;
+  }
+
+  @Override
+  public long getLongGroupResult() {
+    return count;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    count = 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
new file mode 100644
index 0000000..cf8e626
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates count(*) for a PTF group.
+ *
+ * Count all rows of the group.  No input column/expression.
+ */
+public class VectorPTFEvaluatorCountStar extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorCountStar.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected long count;
+
+  public VectorPTFEvaluatorCountStar(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    // No input expression for COUNT(*).
+    // evaluateInputExpr(batch);
+
+    // Count all rows.
+
+    count += batch.size;
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return false;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.LONG;
+  }
+
+  @Override
+  public long getLongGroupResult() {
+    return count;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    count = 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
new file mode 100644
index 0000000..599e73b
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal avg() for a PTF group.
+ *
+ * Sum up non-null column values; group result is sum / non-null count.
+ */
+public class VectorPTFEvaluatorDecimalAvg extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDecimalAvg.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean isGroupResultNull;
+  protected HiveDecimalWritable sum;
+  private int nonNullGroupCount;
+  private HiveDecimalWritable temp;
+  private HiveDecimalWritable avg;
+
+  public VectorPTFEvaluatorDecimalAvg(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    sum = new HiveDecimalWritable();
+    temp = new HiveDecimalWritable();
+    avg = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Sum all non-null decimal column values for avg; maintain 
isGroupResultNull; after last row of
+    // last group batch compute the group avg when sum is non-null.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+    if (decimalColVector.isRepeating) {
+
+      if (decimalColVector.noNulls) {
+
+        // We have a repeated value.  The sum increases by value * batch.size.
+        temp.setFromLong(batch.size);
+        if (isGroupResultNull) {
+
+          // First aggregation calculation for group.
+          sum.set(decimalColVector.vector[0]);
+          sum.mutateMultiply(temp);
+          isGroupResultNull = false;
+        } else {
+          temp.mutateMultiply(decimalColVector.vector[0]);
+          sum.mutateAdd(temp);
+        }
+        nonNullGroupCount += size;
+      }
+    } else if (decimalColVector.noNulls) {
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+
+        // First aggregation calculation for group.
+        sum.set(vector[0]);
+        isGroupResultNull = false;
+      } else {
+        sum.mutateAdd(vector[0]);
+      }
+      for (int i = 1; i < size; i++) {
+        sum.mutateAdd(vector[i]);
+      }
+      nonNullGroupCount += size;
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (++i >= size) {
+          return;
+        }
+      }
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+
+        // First aggregation calculation for group.
+        sum.set(vector[i++]);
+        isGroupResultNull = false;
+      } else {
+        sum.mutateAdd(vector[i++]);
+      }
+      nonNullGroupCount++;
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum.mutateAdd(vector[i]);
+          nonNullGroupCount++;
+        }
+      }
+    }
+
+    if (isLastGroupBatch) {
+      if (!isGroupResultNull) {
+        avg.set(sum);
+        temp.setFromLong(nonNullGroupCount);
+        avg.mutateDivide(temp);
+      }
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return isGroupResultNull;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public HiveDecimalWritable getDecimalGroupResult() {
+    return avg;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isGroupResultNull = true;
+    sum.set(HiveDecimal.ZERO);
+    nonNullGroupCount = 0;
+    avg.set(HiveDecimal.ZERO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
new file mode 100644
index 0000000..01a8c53
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.FastHiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal first_value() for a PTF group.
+ *
+ * We capture the first value from the first batch.  It can be NULL.
+ * We then set (stream) the output column with that value as repeated in each 
batch.
+ */
+public class VectorPTFEvaluatorDecimalFirstValue extends 
VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDecimalFirstValue.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean haveFirstValue;
+  protected boolean isGroupResultNull;
+  protected HiveDecimalWritable firstValue;
+
+  public VectorPTFEvaluatorDecimalFirstValue(WindowFrameDef windowFrameDef,
+      VectorExpression inputVecExpr, int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    firstValue = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // First row determines isGroupResultNull and decimal firstValue; stream 
fill result as repeated.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    if (!haveFirstValue) {
+      final int size = batch.size;
+      if (size == 0) {
+        return;
+      }
+      DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+      if (decimalColVector.isRepeating) {
+        if (decimalColVector.noNulls) {
+          firstValue.set(decimalColVector.vector[0]);
+          isGroupResultNull = false;
+        }
+      } else if (decimalColVector.noNulls) {
+        firstValue.set(decimalColVector.vector[0]);
+        isGroupResultNull = false;
+      } else {
+        if (!decimalColVector.isNull[0]) {
+          firstValue.set(decimalColVector.vector[0]);
+          isGroupResultNull = false;
+        }
+      }
+      haveFirstValue = true;
+    }
+
+    // First value is repeated for all batches.
+    DecimalColumnVector outputColVector = (DecimalColumnVector) 
batch.cols[outputColumnNum];
+    outputColVector.isRepeating = true;
+    if (isGroupResultNull) {
+      outputColVector.noNulls = false;
+      outputColVector.isNull[0] = true;
+    } else {
+      outputColVector.noNulls = true;
+      outputColVector.isNull[0] = false;
+      outputColVector.vector[0].set(firstValue);
+    }
+  }
+
+  public boolean streamsResult() {
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    haveFirstValue = false;
+    isGroupResultNull = true;
+    firstValue.set(HiveDecimal.ZERO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
new file mode 100644
index 0000000..8a50476
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.FastHiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal last_value() for a PTF group.
+ *
+ * We capture the last value from the last batch.  It can be NULL.
+ * It becomes the group value.
+ */
+public class VectorPTFEvaluatorDecimalLastValue extends VectorPTFEvaluatorBase 
{
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDecimalLastValue.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean isGroupResultNull;
+  protected HiveDecimalWritable lastValue;
+
+  public VectorPTFEvaluatorDecimalLastValue(WindowFrameDef windowFrameDef,
+      VectorExpression inputVecExpr, int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    lastValue = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Last row of last batch determines isGroupResultNull and decimal 
lastValue.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    if (!isLastGroupBatch) {
+      return;
+    }
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+    if (decimalColVector.isRepeating) {
+      if (decimalColVector.noNulls) {
+        lastValue.set(decimalColVector.vector[0]);
+        isGroupResultNull = false;
+      } else {
+        isGroupResultNull = true;
+      }
+    } else if (decimalColVector.noNulls) {
+      lastValue.set(decimalColVector.vector[size - 1]);
+      isGroupResultNull = false;
+    } else {
+      final int lastBatchIndex = size - 1;
+      if (!decimalColVector.isNull[lastBatchIndex]) {
+        lastValue.set(decimalColVector.vector[lastBatchIndex]);
+        isGroupResultNull = false;
+      } else {
+        isGroupResultNull = true;
+      }
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return isGroupResultNull;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public HiveDecimalWritable getDecimalGroupResult() {
+    return lastValue;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isGroupResultNull = true;
+    lastValue.set(HiveDecimal.ZERO);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
new file mode 100644
index 0000000..3c59268
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.FastHiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal max() for a PTF group.
+ */
+public class VectorPTFEvaluatorDecimalMax extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDecimalMax.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean isGroupResultNull;
+  protected HiveDecimalWritable max;
+
+  public VectorPTFEvaluatorDecimalMax(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    max = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Determine maximum of all non-null decimal column values; maintain 
isGroupResultNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+    if (decimalColVector.isRepeating) {
+      if (decimalColVector.noNulls) {
+        if (isGroupResultNull) {
+          max.set(decimalColVector.vector[0]);
+          isGroupResultNull = false;
+        } else {
+          HiveDecimalWritable repeatedMax = decimalColVector.vector[0];
+          if (repeatedMax.compareTo(max) == 1) {
+            max.set(repeatedMax);
+          }
+        }
+      }
+    } else if (decimalColVector.noNulls) {
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+        max.set(vector[0]);
+        isGroupResultNull = false;
+      } else {
+        final HiveDecimalWritable dec = vector[0];
+        if (dec.compareTo(max) == 1) {
+          max.set(dec);
+        }
+      }
+      for (int i = 1; i < size; i++) {
+        final HiveDecimalWritable dec = vector[i];
+        if (dec.compareTo(max) == 1) {
+          max.set(dec);
+        }
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (++i >= size) {
+          return;
+        }
+      }
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+        max.set(vector[i++]);
+        isGroupResultNull = false;
+      } else {
+        final HiveDecimalWritable dec = vector[i++];
+        if (dec.compareTo(max) == 1) {
+          max.set(dec);
+        }
+      }
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final HiveDecimalWritable dec = vector[i];
+          if (dec.compareTo(max) == 1) {
+            max.set(dec);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return isGroupResultNull;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public HiveDecimalWritable getDecimalGroupResult() {
+    return max;
+  }
+
+  private static HiveDecimal MIN_VALUE = 
HiveDecimal.create("-99999999999999999999999999999999999999");
+
+  @Override
+  public void resetEvaluator() {
+    isGroupResultNull = true;
+    max.set(MIN_VALUE);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
new file mode 100644
index 0000000..0f7ea04
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.FastHiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal min() for a PTF group.
+ */
+public class VectorPTFEvaluatorDecimalMin extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDecimalMin.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean isGroupResultNull;
+  protected HiveDecimalWritable min;
+
+  public VectorPTFEvaluatorDecimalMin(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    min = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Determine minimum of all non-null decimal column values; maintain 
isGroupResultNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+    if (decimalColVector.isRepeating) {
+      if (decimalColVector.noNulls) {
+        if (isGroupResultNull) {
+          min.set(decimalColVector.vector[0]);
+          isGroupResultNull = false;
+        } else {
+          HiveDecimalWritable repeatedMin = decimalColVector.vector[0];
+          if (repeatedMin.compareTo(min) == -1) {
+            min.set(repeatedMin);
+          }
+        }
+      }
+    } else if (decimalColVector.noNulls) {
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+        min.set(vector[0]);
+        isGroupResultNull = false;
+      } else {
+        final HiveDecimalWritable dec = vector[0];
+        if (dec.compareTo(min) == -1) {
+          min.set(dec);
+        }
+      }
+      for (int i = 1; i < size; i++) {
+        final HiveDecimalWritable dec = vector[i];
+        if (dec.compareTo(min) == -1) {
+          min.set(dec);
+        }
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (++i >= size) {
+          return;
+        }
+      }
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+        min.set(vector[i++]);
+        isGroupResultNull = false;
+      } else {
+        final HiveDecimalWritable dec = vector[i++];
+        if (dec.compareTo(min) == -1) {
+          min.set(dec);
+        }
+      }
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          final HiveDecimalWritable dec = vector[i];
+          if (dec.compareTo(min) == -1) {
+            min.set(dec);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return isGroupResultNull;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public HiveDecimalWritable getDecimalGroupResult() {
+    return min;
+  }
+
+  private static HiveDecimal MAX_VALUE = 
HiveDecimal.create("99999999999999999999999999999999999999");
+
+  @Override
+  public void resetEvaluator() {
+    isGroupResultNull = true;
+    min.set(MAX_VALUE);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
new file mode 100644
index 0000000..8300781
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates HiveDecimal sum() for a PTF group.
+ */
+public class VectorPTFEvaluatorDecimalSum extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDecimalSum.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean isGroupResultNull;
+  protected HiveDecimalWritable sum;
+  protected HiveDecimalWritable temp;
+
+  public VectorPTFEvaluatorDecimalSum(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    sum = new HiveDecimalWritable();
+    temp = new HiveDecimalWritable();
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Sum all non-null decimal column values; maintain isGroupResultNull.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DecimalColumnVector decimalColVector = ((DecimalColumnVector) 
batch.cols[inputColumnNum]);
+    if (decimalColVector.isRepeating) {
+
+      if (decimalColVector.noNulls) {
+        temp.setFromLong(batch.size);
+        if (isGroupResultNull) {
+
+          // First aggregation calculation for group.
+          sum.set(decimalColVector.vector[0]);
+          sum.mutateMultiply(temp);
+          isGroupResultNull = false;
+        } else {
+          temp.mutateMultiply(decimalColVector.vector[0]);
+          sum.mutateAdd(temp);
+        }
+      }
+    } else if (decimalColVector.noNulls) {
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+
+        // First aggregation calculation for group.
+        sum.set(vector[0]);
+        isGroupResultNull = false;
+      } else {
+        sum.mutateAdd(vector[0]);
+      }
+      for (int i = 1; i < size; i++) {
+        sum.mutateAdd(vector[i]);
+      }
+    } else {
+      boolean[] batchIsNull = decimalColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (++i >= size) {
+          return;
+        }
+      }
+      HiveDecimalWritable[] vector = decimalColVector.vector;
+      if (isGroupResultNull) {
+
+        // First aggregation calculation for group.
+        sum.set(vector[i++]);
+        isGroupResultNull = false;
+      } else {
+        sum.mutateAdd(vector[i++]);
+      }
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          sum.mutateAdd(vector[i]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return isGroupResultNull;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DECIMAL;
+  }
+
+  @Override
+  public HiveDecimalWritable getDecimalGroupResult() {
+    return sum;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isGroupResultNull = true;
+    sum.set(HiveDecimal.ZERO);;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
new file mode 100644
index 0000000..62f7aa5
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+/**
+ * This class evaluates rank() for a PTF group.
+ *
+ * Dense rank starts at 1; the same dense rank is streamed to the output 
column as repeated; after
+ * the last group row, the dense rank incremented by 1.
+ */
+public class VectorPTFEvaluatorDenseRank extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDenseRank.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  private int denseRank;
+
+  public VectorPTFEvaluatorDenseRank(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    LongColumnVector longColVector = (LongColumnVector) 
batch.cols[outputColumnNum];
+    longColVector.isRepeating = true;
+    longColVector.noNulls = true;
+    longColVector.isNull[0] = false;
+    longColVector.vector[0] = denseRank;
+
+    if (isLastGroupBatch) {
+      denseRank++;
+    }
+  }
+
+  public boolean streamsResult() {
+    // No group value.
+    return true;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.LONG;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    denseRank = 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
new file mode 100644
index 0000000..2c379d7
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.ptf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class evaluates double avg() for a PTF group.
+ *
+ * Sum up non-null column values; group result is sum / non-null count.
+ */
+public class VectorPTFEvaluatorDoubleAvg extends VectorPTFEvaluatorBase {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = 
VectorPTFEvaluatorDoubleAvg.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected boolean isGroupResultNull;
+  protected double sum;
+  private int nonNullGroupCount;
+  private double avg;
+
+  public VectorPTFEvaluatorDoubleAvg(WindowFrameDef windowFrameDef, 
VectorExpression inputVecExpr,
+      int outputColumnNum) {
+    super(windowFrameDef, inputVecExpr, outputColumnNum);
+    resetEvaluator();
+  }
+
+  public void evaluateGroupBatch(VectorizedRowBatch batch, boolean 
isLastGroupBatch) {
+    evaluateInputExpr(batch);
+
+    // Sum all non-null double column values for avg; maintain 
isGroupResultNull; after last row of
+    // last group batch compute the group avg when sum is non-null.
+
+    // We do not filter when PTF is in reducer.
+    Preconditions.checkState(!batch.selectedInUse);
+
+    final int size = batch.size;
+    if (size == 0) {
+      return;
+    }
+    DoubleColumnVector doubleColVector = ((DoubleColumnVector) 
batch.cols[inputColumnNum]);
+    if (doubleColVector.isRepeating) {
+
+      if (doubleColVector.noNulls) {
+
+        // We have a repeated value.  The sum increases by value * batch.size.
+        if (isGroupResultNull) {
+
+          // First aggregation calculation for group.
+          sum = doubleColVector.vector[0] * batch.size;
+          isGroupResultNull = false;
+        } else {
+          sum += doubleColVector.vector[0] * batch.size;
+        }
+        nonNullGroupCount += size;
+      }
+    } else if (doubleColVector.noNulls) {
+      double[] vector = doubleColVector.vector;
+      double varSum = vector[0];
+      for (int i = 1; i < size; i++) {
+        varSum += vector[i];
+      }
+      nonNullGroupCount += size;
+      if (isGroupResultNull) {
+
+        // First aggregation calculation for group.
+        sum = varSum;
+        isGroupResultNull = false;
+      } else {
+        sum += varSum;
+      }
+    } else {
+      boolean[] batchIsNull = doubleColVector.isNull;
+      int i = 0;
+      while (batchIsNull[i]) {
+        if (++i >= size) {
+          return;
+        }
+      }
+      double[] vector = doubleColVector.vector;
+      double varSum = vector[i++];
+      nonNullGroupCount++;
+      for (; i < size; i++) {
+        if (!batchIsNull[i]) {
+          varSum += vector[i];
+          nonNullGroupCount++;
+        }
+      }
+      if (isGroupResultNull) {
+
+        // First aggregation calculation for group.
+        sum = varSum;
+        isGroupResultNull = false;
+      } else {
+        sum += varSum;
+      }
+    }
+
+    if (isLastGroupBatch) {
+      if (!isGroupResultNull) {
+        avg = sum / nonNullGroupCount;
+      }
+    }
+  }
+
+  @Override
+  public boolean isGroupResultNull() {
+    return isGroupResultNull;
+  }
+
+  @Override
+  public Type getResultColumnVectorType() {
+    return Type.DOUBLE;
+  }
+
+  @Override
+  public double getDoubleGroupResult() {
+    return avg;
+  }
+
+  @Override
+  public void resetEvaluator() {
+    isGroupResultNull = true;
+    sum = 0.0;
+    nonNullGroupCount = 0;
+    avg = 0.0;
+  }
+}
\ No newline at end of file

Reply via email to