HIVE-16369: Vectorization: Support PTF (Part 1: No Custom Window Framing -- Default Only) (Matt McCline, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a0df0ace Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a0df0ace Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a0df0ace Branch: refs/heads/master Commit: a0df0ace2ac179329f39c43ad0b67da9ba3ba95b Parents: fed915c Author: Matt McCline <mmccl...@hortonworks.com> Authored: Thu Jul 20 05:16:08 2017 -0500 Committer: Matt McCline <mmccl...@hortonworks.com> Committed: Thu Jul 20 05:16:08 2017 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../apache/hadoop/hive/ql/exec/Operator.java | 7 + .../hadoop/hive/ql/exec/OperatorFactory.java | 3 + .../ql/exec/spark/SparkReduceRecordHandler.java | 16 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 62 +- .../ql/exec/vector/VectorGroupByOperator.java | 61 +- .../ql/exec/vector/VectorSelectOperator.java | 8 + .../ql/exec/vector/VectorizationContext.java | 9 +- .../ql/exec/vector/VectorizedBatchUtil.java | 61 +- .../exec/vector/ptf/VectorPTFEvaluatorBase.java | 122 + .../vector/ptf/VectorPTFEvaluatorCount.java | 108 + .../vector/ptf/VectorPTFEvaluatorCountStar.java | 79 + .../ptf/VectorPTFEvaluatorDecimalAvg.java | 162 + .../VectorPTFEvaluatorDecimalFirstValue.java | 117 + .../ptf/VectorPTFEvaluatorDecimalLastValue.java | 113 + .../ptf/VectorPTFEvaluatorDecimalMax.java | 146 + .../ptf/VectorPTFEvaluatorDecimalMin.java | 146 + .../ptf/VectorPTFEvaluatorDecimalSum.java | 140 + .../vector/ptf/VectorPTFEvaluatorDenseRank.java | 77 + .../vector/ptf/VectorPTFEvaluatorDoubleAvg.java | 153 + .../ptf/VectorPTFEvaluatorDoubleFirstValue.java | 113 + .../ptf/VectorPTFEvaluatorDoubleLastValue.java | 109 + .../vector/ptf/VectorPTFEvaluatorDoubleMax.java | 136 + .../vector/ptf/VectorPTFEvaluatorDoubleMin.java | 136 + .../vector/ptf/VectorPTFEvaluatorDoubleSum.java | 134 + .../vector/ptf/VectorPTFEvaluatorLongAvg.java | 153 + .../ptf/VectorPTFEvaluatorLongFirstValue.java | 113 + .../ptf/VectorPTFEvaluatorLongLastValue.java | 109 + .../vector/ptf/VectorPTFEvaluatorLongMax.java | 136 + .../vector/ptf/VectorPTFEvaluatorLongMin.java | 136 + .../vector/ptf/VectorPTFEvaluatorLongSum.java | 134 + .../exec/vector/ptf/VectorPTFEvaluatorRank.java | 81 + .../vector/ptf/VectorPTFEvaluatorRowNumber.java | 77 + .../exec/vector/ptf/VectorPTFGroupBatches.java | 216 + .../ql/exec/vector/ptf/VectorPTFOperator.java | 570 + .../hive/ql/optimizer/physical/Vectorizer.java | 522 +- .../org/apache/hadoop/hive/ql/plan/PTFDesc.java | 112 + .../hadoop/hive/ql/plan/VectorPTFDesc.java | 360 + .../hadoop/hive/ql/plan/VectorPTFInfo.java | 161 + .../hadoop/hive/ql/plan/ptf/WindowFrameDef.java | 2 +- .../hive/ql/udf/generic/GenericUDFLeadLag.java | 4 +- .../clientpositive/vector_groupby_cube1.q | 17 +- .../vector_groupby_grouping_id1.q | 21 + .../vector_groupby_grouping_id2.q | 52 + .../vector_groupby_grouping_window.q | 1 + .../vector_outer_reference_windowed.q | 91 + .../clientpositive/vector_ptf_part_simple.q | 240 +- .../queries/clientpositive/vector_windowing.q | 791 ++ .../vector_windowing_expressions.q | 94 + .../clientpositive/vector_windowing_gby.q | 21 + .../clientpositive/vector_windowing_gby2.q | 48 + .../vector_windowing_multipartitioning.q | 73 + .../clientpositive/vector_windowing_navfn.q | 2 + .../vector_windowing_order_null.q | 58 + .../vector_windowing_range_multiorder.q | 68 + .../clientpositive/vector_windowing_rank.q | 117 + .../clientpositive/vector_windowing_streaming.q | 85 + .../vector_windowing_windowspec.q | 70 + .../vector_windowing_windowspec4.q | 37 + .../queries/clientpositive/vectorized_ptf.q | 1 + .../clientpositive/correlationoptimizer12.q.out | 4 +- .../results/clientpositive/ctas_colname.q.out | 4 +- .../clientpositive/distinct_windowing.q.out | 8 +- .../distinct_windowing_no_cbo.q.out | 12 +- .../groupby_grouping_window.q.out | 2 +- .../llap/groupby_resolution.q.out | 2 +- .../test/results/clientpositive/llap/ptf.q.out | 114 +- .../clientpositive/llap/ptf_streaming.q.out | 62 +- .../clientpositive/llap/subquery_in.q.out | 4 +- .../clientpositive/llap/subquery_notin.q.out | 14 +- .../clientpositive/llap/subquery_scalar.q.out | 4 +- .../llap/vector_groupby_cube1.q.out | 176 +- .../llap/vector_groupby_grouping_id1.q.out | 800 ++ .../llap/vector_groupby_grouping_id2.q.out | 2117 +++ .../llap/vector_groupby_grouping_window.q.out | 4 +- .../llap/vector_outer_reference_windowed.q.out | 1435 ++ .../llap/vector_ptf_part_simple.q.out | 3880 ++++- .../llap/vector_tablesample_rows.q.out | 68 +- .../clientpositive/llap/vector_windowing.q.out | 9768 +++++++++++++ .../llap/vector_windowing_expressions.q.out | 2022 +++ .../llap/vector_windowing_gby.q.out | 314 + .../llap/vector_windowing_gby2.q.out | 1158 ++ .../vector_windowing_multipartitioning.q.out | 11587 +++++++++++++++ .../llap/vector_windowing_navfn.q.out | 218 +- .../llap/vector_windowing_order_null.q.out | 1232 ++ .../vector_windowing_range_multiorder.q.out | 12584 +++++++++++++++++ .../llap/vector_windowing_rank.q.out | 1860 +++ .../llap/vector_windowing_streaming.q.out | 1036 ++ .../llap/vector_windowing_windowspec.q.out | 2374 ++++ .../llap/vector_windowing_windowspec4.q.out | 223 + .../llap/vectorization_limit.q.out | 24 +- .../llap/vectorized_mapjoin2.q.out | 4 +- .../clientpositive/llap/vectorized_ptf.q.out | 294 +- .../results/clientpositive/llap/windowing.q.out | 2 +- .../llap/windowing_windowspec.q.out | 955 ++ .../outer_reference_windowed.q.out | 10 +- ql/src/test/results/clientpositive/pcs.q.out | 2 +- .../results/clientpositive/ppd_windowing1.q.out | 66 +- .../results/clientpositive/ptfgroupbyjoin.q.out | 2 +- .../results/clientpositive/quotedid_basic.q.out | 4 +- .../test/results/clientpositive/semijoin2.q.out | 4 +- .../test/results/clientpositive/semijoin4.q.out | 2 +- .../test/results/clientpositive/semijoin5.q.out | 4 +- .../spark/groupby_resolution.q.out | 2 +- .../test/results/clientpositive/spark/ptf.q.out | 114 +- .../clientpositive/spark/ptf_streaming.q.out | 62 +- .../clientpositive/spark/subquery_in.q.out | 4 +- .../spark/union_remove_6_subq.q.out | 2 +- .../clientpositive/spark/vectorized_ptf.q.out | 294 +- .../clientpositive/spark/windowing.q.out | 2 +- .../clientpositive/subquery_in_having.q.out | 2 +- .../subquery_unqualcolumnrefs.q.out | 4 +- .../clientpositive/union_remove_6_subq.q.out | 2 +- .../vector_outer_reference_windowed.q.out | 1221 ++ .../clientpositive/vector_ptf_part_simple.q.out | 2895 ++++ .../clientpositive/vector_windowing.q.out | 9149 ++++++++++++ .../vector_windowing_expressions.q.out | 1822 +++ .../clientpositive/vector_windowing_gby.q.out | 252 + .../clientpositive/vector_windowing_gby2.q.out | 1015 ++ .../vector_windowing_multipartitioning.q.out | 11288 +++++++++++++++ .../vector_windowing_order_null.q.out | 989 ++ .../vector_windowing_range_multiorder.q.out | 12239 ++++++++++++++++ .../clientpositive/vector_windowing_rank.q.out | 1543 ++ .../vector_windowing_streaming.q.out | 836 ++ .../vector_windowing_windowspec.q.out | 2060 +++ .../vector_windowing_windowspec4.q.out | 211 + .../results/clientpositive/windowing_gby2.q.out | 12 +- .../clientpositive/windowing_navfn.q.out | 2 +- .../clientpositive/windowing_streaming.q.out | 6 +- 129 files changed, 106272 insertions(+), 1210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9c954be..69205bc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2839,6 +2839,10 @@ public class HiveConf extends Configuration { "1. chosen : use VectorUDFAdaptor for a small set of UDFs that were choosen for good performance\n" + "2. all : use VectorUDFAdaptor for all UDFs" ), + HIVE_VECTORIZATION_PTF_ENABLED("hive.vectorized.execution.ptf.enabled", false, + "This flag should be set to true to enable vectorized mode of the PTF of query execution.\n" + + "The default value is false."), + HIVE_VECTORIZATION_COMPLEX_TYPES_ENABLED("hive.vectorized.complex.types.enabled", true, "This flag should be set to true to enable vectorization\n" + "of expressions with complex types.\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 7f646c4..66c34aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -637,6 +637,13 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C defaultEndGroup(); } + // Tell the operator the status of the next key-grouped VectorizedRowBatch that will be delivered + // to the process method. E.g. by reduce-shuffle. These semantics are needed by PTF so it can + // efficiently add computed values to the last batch of a group key. + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + // Do nothing. + } + // an blocking operator (e.g. GroupByOperator and JoinOperator) can // override this method to forward its outputs public void flush() throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index af1fa66..993da83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; +import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; @@ -138,6 +140,7 @@ public final class OperatorFactory { vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class); vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class); vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class); + vectorOpvec.put(PTFDesc.class, VectorPTFOperator.class); vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class); } http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 36158a1..7c1164b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -442,13 +442,13 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { // Flush current group batch as last batch of group. if (batch.size > 0) { + // Indicate last batch of current group. + reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true); + // Forward; reset key and value columns. forwardBatch(/* resetValueColumnsOnly */ false); - reducer.endGroup(); } - reducer.startGroup(); - // Deserialize group key into vector row columns. byte[] keyBytes = keyWritable.getBytes(); int keyLength = keyWritable.getLength(); @@ -475,6 +475,9 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { if (batch.size >= batch.getMaxSize() || batch.size > 0 && batchBytes >= BATCH_BYTES) { + // We have a row for current group, so we indicate not the last batch. + reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ false); + // Batch is full or using too much space. forwardBatch(/* resetValueColumnsOnly */ true); } @@ -581,10 +584,13 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { try { if (vectorized) { if (batch.size > 0) { - forwardBatch(/* resetValueColumnsOnly */ false); + if (handleGroupKey) { - reducer.endGroup(); + // Indicate last batch of current group. + reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true); } + + forwardBatch(/* resetValueColumnsOnly */ false); } } else { if (groupKey != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 43f9db3..bdde81a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -369,20 +369,6 @@ public class ReduceRecordSource implements RecordSource { BytesWritable keyWritable = (BytesWritable) reader.getCurrentKey(); valueWritables = reader.getCurrentValues(); - // Check if this is a new group or same group - if (handleGroupKey && !keyWritable.equals(this.groupKey)) { - // If a operator wants to do some work at the beginning of a group - if (groupKey == null) { // the first group - this.groupKey = new BytesWritable(); - } else { - // If a operator wants to do some work at the end of a group - reducer.endGroup(); - } - - groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); - reducer.startGroup(); - } - processVectorGroup(keyWritable, valueWritables, tag); return true; } catch (Throwable e) { @@ -398,15 +384,20 @@ public class ReduceRecordSource implements RecordSource { } /** + * + * @param keyWritable * @param values - * @return true if it is not done and can take more inputs + * @param tag + * @throws HiveException + * @throws IOException */ private void processVectorGroup(BytesWritable keyWritable, Iterable<Object> values, byte tag) throws HiveException, IOException { + Preconditions.checkState(batch.size == 0); + // Deserialize key into vector row columns. - // Since we referencing byte column vector byte arrays by reference, we don't need - // a data buffer. + // byte[] keyBytes = keyWritable.getBytes(); int keyLength = keyWritable.getLength(); @@ -432,21 +423,14 @@ public class ReduceRecordSource implements RecordSource { int batchBytes = keyBytes.length; try { for (Object value : values) { - if (valueLazyBinaryDeserializeToRow != null) { - // Deserialize value into vector row columns. - BytesWritable valueWritable = (BytesWritable) value; - byte[] valueBytes = valueWritable.getBytes(); - int valueLength = valueWritable.getLength(); - batchBytes += valueLength; - - valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength); - valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); - } - rowIdx++; - if (rowIdx >= maxSize || batchBytes >= BATCH_BYTES) { + if (rowIdx >= maxSize || + (rowIdx > 0 && batchBytes >= BATCH_BYTES)) { - // Batch is full. + // Batch is full AND we have at least 1 more row... batch.size = rowIdx; + if (handleGroupKey) { + reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ false); + } reducer.process(batch, tag); // Reset just the value columns and value buffer. @@ -455,12 +439,26 @@ public class ReduceRecordSource implements RecordSource { batch.cols[i].reset(); } rowIdx = 0; - batchBytes = 0; + batchBytes = keyBytes.length; } + if (valueLazyBinaryDeserializeToRow != null) { + // Deserialize value into vector row columns. + BytesWritable valueWritable = (BytesWritable) value; + byte[] valueBytes = valueWritable.getBytes(); + int valueLength = valueWritable.getLength(); + batchBytes += valueLength; + + valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength); + valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); + } + rowIdx++; } if (rowIdx > 0) { // Flush final partial batch. - VectorizedBatchUtil.setBatchSize(batch, rowIdx); + batch.size = rowIdx; + if (handleGroupKey) { + reducer.setNextVectorBatchGroupStatus(/* isLastGroupBatch */ true); + } reducer.process(batch, tag); } batch.reset(); http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 613a31a..31f2621 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -148,8 +148,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements */ private static interface IProcessingMode { public void initialize(Configuration hconf) throws HiveException; - public void startGroup() throws HiveException; - public void endGroup() throws HiveException; + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException; public void processBatch(VectorizedRowBatch batch) throws HiveException; public void close(boolean aborted) throws HiveException; } @@ -159,14 +158,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements */ private abstract class ProcessingModeBase implements IProcessingMode { - // Overridden and used in sorted reduce group batch processing mode. + // Overridden and used in ProcessingModeReduceMergePartial mode. @Override - public void startGroup() throws HiveException { - // Do nothing. - } - @Override - public void endGroup() throws HiveException { - // Do nothing. + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + // Some Spark plans cause Hash and other modes to get this. So, ignore it. } protected abstract void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, @@ -258,6 +253,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements } @Override + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + // Do nothing. + } + + @Override public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException { for (int i = 0; i < aggregators.length; ++i) { @@ -682,6 +682,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements } @Override + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + // Do nothing. + } + + @Override public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException { @@ -770,8 +775,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements */ private class ProcessingModeReduceMergePartial extends ProcessingModeBase { - private boolean inGroup; private boolean first; + private boolean isLastGroupBatch; /** * The group vector key helper. @@ -790,7 +795,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements @Override public void initialize(Configuration hconf) throws HiveException { - inGroup = false; + isLastGroupBatch = true; // We do not include the dummy grouping set column in the output. So we pass outputKeyLength // instead of keyExpressions.length @@ -802,24 +807,18 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements } @Override - public void startGroup() throws HiveException { - inGroup = true; - first = true; - } - - @Override - public void endGroup() throws HiveException { - if (inGroup && !first) { - writeGroupRow(groupAggregators, buffer); - groupAggregators.reset(); + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + if (this.isLastGroupBatch) { + // Previous batch was the last of a group of batches. Remember the next is the first batch + // of a new group of batches. + first = true; } - inGroup = false; + this.isLastGroupBatch = isLastGroupBatch; } @Override public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException { - assert(inGroup); if (first) { // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. first = false; @@ -836,11 +835,16 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements for (int i = 0; i < aggregators.length; ++i) { aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch); } + + if (isLastGroupBatch) { + writeGroupRow(groupAggregators, buffer); + groupAggregators.reset(); + } } @Override public void close(boolean aborted) throws HiveException { - if (!aborted && inGroup && !first) { + if (!aborted && !first && !isLastGroupBatch) { writeGroupRow(groupAggregators, buffer); } } @@ -1013,21 +1017,26 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements } @Override + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + processingMode.setNextVectorBatchGroupStatus(isLastGroupBatch); + } + + @Override public void startGroup() throws HiveException { - processingMode.startGroup(); // We do not call startGroup on operators below because we are batching rows in // an output batch and the semantics will not work. // super.startGroup(); + throw new HiveException("Unexpected startGroup"); } @Override public void endGroup() throws HiveException { - processingMode.endGroup(); // We do not call endGroup on operators below because we are batching rows in // an output batch and the semantics will not work. // super.endGroup(); + throw new HiveException("Unexpected startGroup"); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 17ccf21..5f1f952 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -110,6 +110,14 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements outputFieldNames, objectInspectors); } + // Must send on to VectorPTFOperator... + @Override + public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException { + for (Operator<? extends OperatorDesc> op : childOperators) { + op.setNextVectorBatchGroupStatus(isLastGroupBatch); + } + } + @Override public void process(Object row, int tag) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 503bd0c..9e026f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -432,7 +432,7 @@ public class VectorizationContext { return udfsNeedingImplicitDecimalCast.contains(udfClass); } - protected int getInputColumnIndex(String name) throws HiveException { + public int getInputColumnIndex(String name) throws HiveException { if (name == null) { throw new HiveException("Null column name"); } @@ -464,7 +464,7 @@ public class VectorizationContext { private final Set<Integer> usedOutputColumns = new HashSet<Integer>(); - int allocateOutputColumn(TypeInfo typeInfo) throws HiveException { + int allocateOutputColumn(TypeInfo typeInfo) { if (initialOutputCol < 0) { // This is a test calling. return 0; @@ -525,7 +525,7 @@ public class VectorizationContext { } } - public int allocateScratchColumn(TypeInfo typeInfo) throws HiveException { + public int allocateScratchColumn(TypeInfo typeInfo) { return ocm.allocateOutputColumn(typeInfo); } @@ -2672,8 +2672,7 @@ public class VectorizationContext { } } - static String getScratchName(TypeInfo typeInfo) throws HiveException { - + static String getScratchName(TypeInfo typeInfo) { // For now, leave DECIMAL precision/scale in the name so DecimalColumnVector scratch columns // don't need their precision/scale adjusted... if (typeInfo.getCategory() == Category.PRIMITIVE && http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 990e896..03c09e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -579,7 +579,7 @@ public class VectorizedBatchUtil { return typeInfoList.toArray(new TypeInfo[0]); } - static ColumnVector cloneColumnVector(ColumnVector source + public static ColumnVector makeLikeColumnVector(ColumnVector source ) throws HiveException{ if (source instanceof LongColumnVector) { return new LongColumnVector(((LongColumnVector) source).vector.length); @@ -598,25 +598,25 @@ public class VectorizedBatchUtil { return new IntervalDayTimeColumnVector(((IntervalDayTimeColumnVector) source).getLength()); } else if (source instanceof ListColumnVector) { ListColumnVector src = (ListColumnVector) source; - ColumnVector child = cloneColumnVector(src.child); + ColumnVector child = makeLikeColumnVector(src.child); return new ListColumnVector(src.offsets.length, child); } else if (source instanceof MapColumnVector) { MapColumnVector src = (MapColumnVector) source; - ColumnVector keys = cloneColumnVector(src.keys); - ColumnVector values = cloneColumnVector(src.values); + ColumnVector keys = makeLikeColumnVector(src.keys); + ColumnVector values = makeLikeColumnVector(src.values); return new MapColumnVector(src.offsets.length, keys, values); } else if (source instanceof StructColumnVector) { StructColumnVector src = (StructColumnVector) source; ColumnVector[] copy = new ColumnVector[src.fields.length]; for(int i=0; i < copy.length; ++i) { - copy[i] = cloneColumnVector(src.fields[i]); + copy[i] = makeLikeColumnVector(src.fields[i]); } return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, copy); } else if (source instanceof UnionColumnVector) { UnionColumnVector src = (UnionColumnVector) source; ColumnVector[] copy = new ColumnVector[src.fields.length]; for(int i=0; i < copy.length; ++i) { - copy[i] = cloneColumnVector(src.fields[i]); + copy[i] = makeLikeColumnVector(src.fields[i]); } return new UnionColumnVector(src.tags.length, copy); } else @@ -625,6 +625,53 @@ public class VectorizedBatchUtil { " is not supported!"); } + public static void swapColumnVector( + VectorizedRowBatch batch1, int batch1ColumnNum, + VectorizedRowBatch batch2, int batch2ColumnNum) { + ColumnVector colVector1 = batch1.cols[batch1ColumnNum]; + batch1.cols[batch1ColumnNum] = batch2.cols[batch2ColumnNum]; + batch2.cols[batch2ColumnNum] = colVector1; + } + + public static void copyRepeatingColumn(VectorizedRowBatch sourceBatch, int sourceColumnNum, + VectorizedRowBatch targetBatch, int targetColumnNum, boolean setByValue) { + ColumnVector sourceColVector = sourceBatch.cols[sourceColumnNum]; + ColumnVector targetColVector = targetBatch.cols[targetColumnNum]; + + targetColVector.isRepeating = true; + + if (!sourceColVector.noNulls) { + targetColVector.noNulls = false; + targetColVector.isNull[0] = true; + return; + } + + if (sourceColVector instanceof LongColumnVector) { + ((LongColumnVector) targetColVector).vector[0] = ((LongColumnVector) sourceColVector).vector[0]; + } else if (sourceColVector instanceof DoubleColumnVector) { + ((DoubleColumnVector) targetColVector).vector[0] = ((DoubleColumnVector) sourceColVector).vector[0]; + } else if (sourceColVector instanceof BytesColumnVector) { + BytesColumnVector bytesColVector = (BytesColumnVector) sourceColVector; + byte[] bytes = bytesColVector.vector[0]; + final int start = bytesColVector.start[0]; + final int length = bytesColVector.length[0]; + if (setByValue) { + ((BytesColumnVector) targetColVector).setVal(0, bytes, start, length); + } else { + ((BytesColumnVector) targetColVector).setRef(0, bytes, start, length); + } + } else if (sourceColVector instanceof DecimalColumnVector) { + ((DecimalColumnVector) targetColVector).set(0, ((DecimalColumnVector) sourceColVector).vector[0]); + } else if (sourceColVector instanceof TimestampColumnVector) { + ((TimestampColumnVector) targetColVector).set(0, ((TimestampColumnVector) sourceColVector).asScratchTimestamp(0)); + } else if (sourceColVector instanceof IntervalDayTimeColumnVector) { + ((IntervalDayTimeColumnVector) targetColVector).set(0, ((IntervalDayTimeColumnVector) sourceColVector).asScratchIntervalDayTime(0)); + } else { + throw new RuntimeException("Column vector class " + sourceColVector.getClass().getName() + + " is not supported!"); + } + } + /** * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty * @param batch the batch to imitate @@ -635,7 +682,7 @@ public class VectorizedBatchUtil { VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols); for (int i = 0; i < batch.numCols; i++) { if (batch.cols[i] != null) { - newBatch.cols[i] = cloneColumnVector(batch.cols[i]); + newBatch.cols[i] = makeLikeColumnVector(batch.cols[i]); newBatch.cols[i].init(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java new file mode 100644 index 0000000..beca5f9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorBase.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +/** + * This is the vector PTF evaluator base class. An evaluator does the group batch aggregation work + * on an aggregation's 0 or 1 argument(s) and at some point will fill in an output column with the + * aggregation result. The aggregation argument is an input column or expression, or no argument. + * + * When the aggregation is streaming (e.g. row_number, rank, first_value, etc), the output column + * can be filled in immediately by the implementation of evaluateGroupBatch. + * + * For non-streaming aggregations, the aggregation result is not known until the last group batch + * is processed. After the last group batch has been processed, the VectorPTFGroupBatches class + * will call the isGroupResultNull, getResultColumnVectorType, getLongGroupResult | + * getDoubleGroupResult | getDecimalGroupResult, and getOutputColumnNum methods to get aggregation + * result information necessary to write it into the output column (as a repeated column) of all + * the group batches. + */ +public abstract class VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorBase.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected final WindowFrameDef windowFrameDef; + private final VectorExpression inputVecExpr; + protected final int inputColumnNum; + protected final int outputColumnNum; + + public VectorPTFEvaluatorBase(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + this.windowFrameDef = windowFrameDef; + if (inputVecExpr == null) { + inputColumnNum = -1; + this.inputVecExpr = null; + } else { + inputColumnNum = inputVecExpr.getOutputColumn(); + if (inputVecExpr instanceof IdentityExpression) { + this.inputVecExpr = null; + } else { + this.inputVecExpr = inputVecExpr; + } + } + this.outputColumnNum = outputColumnNum; + } + + // Evaluate the aggregation input argument expression. + public void evaluateInputExpr(VectorizedRowBatch batch) { + if (inputVecExpr != null) { + inputVecExpr.evaluate(batch); + } + } + + // Evaluate the aggregation over one of the group's batches. + public abstract void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch); + + // Returns true if the aggregation result will be streamed. + public boolean streamsResult() { + // Assume it is not streamjng by default. + return false; + } + + public int getOutputColumnNum() { + return outputColumnNum; + } + + // After processing all the group's batches with evaluateGroupBatch, is the non-streaming + // aggregation result null? + public boolean isGroupResultNull() { + return false; + } + + // What is the ColumnVector type of the aggregation result? + public abstract Type getResultColumnVectorType(); + + /* + * After processing all the non-streaming group's batches with evaluateGroupBatch and + * isGroupResultNull is false, the aggregation result value (based on getResultColumnVectorType). + */ + + public long getLongGroupResult() { + throw new RuntimeException("No long group result evaluator implementation " + this.getClass().getName()); + } + + public double getDoubleGroupResult() { + throw new RuntimeException("No double group result evaluator implementation " + this.getClass().getName()); + } + + public HiveDecimalWritable getDecimalGroupResult() { + throw new RuntimeException("No decimal group result evaluator implementation " + this.getClass().getName()); + } + + // Resets the aggregation calculation variable(s). + public abstract void resetEvaluator(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java new file mode 100644 index 0000000..638fc9e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCount.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates count(column) for a PTF group. + * + * Count any rows of the group where the input column/expression is non-null. + */ +public class VectorPTFEvaluatorCount extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorCount.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected long count; + + public VectorPTFEvaluatorCount(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Count non-null column rows. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + ColumnVector colVector = batch.cols[inputColumnNum]; + if (colVector.isRepeating) { + if (colVector.noNulls) { + count += size; + } + } else if (colVector.noNulls) { + count += size; + } else { + boolean[] batchIsNull = colVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (++i >= size) { + return; + } + } + long varCount = 1; + i++; + for (; i < size; i++) { + if (!batchIsNull[i]) { + varCount++; + } + } + count += varCount; + } + } + + @Override + public boolean isGroupResultNull() { + return false; + } + + @Override + public Type getResultColumnVectorType() { + return Type.LONG; + } + + @Override + public long getLongGroupResult() { + return count; + } + + @Override + public void resetEvaluator() { + count = 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java new file mode 100644 index 0000000..cf8e626 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorCountStar.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates count(*) for a PTF group. + * + * Count all rows of the group. No input column/expression. + */ +public class VectorPTFEvaluatorCountStar extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorCountStar.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected long count; + + public VectorPTFEvaluatorCountStar(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + // No input expression for COUNT(*). + // evaluateInputExpr(batch); + + // Count all rows. + + count += batch.size; + } + + @Override + public boolean isGroupResultNull() { + return false; + } + + @Override + public Type getResultColumnVectorType() { + return Type.LONG; + } + + @Override + public long getLongGroupResult() { + return count; + } + + @Override + public void resetEvaluator() { + count = 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java new file mode 100644 index 0000000..599e73b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalAvg.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal avg() for a PTF group. + * + * Sum up non-null column values; group result is sum / non-null count. + */ +public class VectorPTFEvaluatorDecimalAvg extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDecimalAvg.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean isGroupResultNull; + protected HiveDecimalWritable sum; + private int nonNullGroupCount; + private HiveDecimalWritable temp; + private HiveDecimalWritable avg; + + public VectorPTFEvaluatorDecimalAvg(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + sum = new HiveDecimalWritable(); + temp = new HiveDecimalWritable(); + avg = new HiveDecimalWritable(); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Sum all non-null decimal column values for avg; maintain isGroupResultNull; after last row of + // last group batch compute the group avg when sum is non-null. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + if (decimalColVector.isRepeating) { + + if (decimalColVector.noNulls) { + + // We have a repeated value. The sum increases by value * batch.size. + temp.setFromLong(batch.size); + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum.set(decimalColVector.vector[0]); + sum.mutateMultiply(temp); + isGroupResultNull = false; + } else { + temp.mutateMultiply(decimalColVector.vector[0]); + sum.mutateAdd(temp); + } + nonNullGroupCount += size; + } + } else if (decimalColVector.noNulls) { + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum.set(vector[0]); + isGroupResultNull = false; + } else { + sum.mutateAdd(vector[0]); + } + for (int i = 1; i < size; i++) { + sum.mutateAdd(vector[i]); + } + nonNullGroupCount += size; + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (++i >= size) { + return; + } + } + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum.set(vector[i++]); + isGroupResultNull = false; + } else { + sum.mutateAdd(vector[i++]); + } + nonNullGroupCount++; + for (; i < size; i++) { + if (!batchIsNull[i]) { + sum.mutateAdd(vector[i]); + nonNullGroupCount++; + } + } + } + + if (isLastGroupBatch) { + if (!isGroupResultNull) { + avg.set(sum); + temp.setFromLong(nonNullGroupCount); + avg.mutateDivide(temp); + } + } + } + + @Override + public boolean isGroupResultNull() { + return isGroupResultNull; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public HiveDecimalWritable getDecimalGroupResult() { + return avg; + } + + @Override + public void resetEvaluator() { + isGroupResultNull = true; + sum.set(HiveDecimal.ZERO); + nonNullGroupCount = 0; + avg.set(HiveDecimal.ZERO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java new file mode 100644 index 0000000..01a8c53 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalFirstValue.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.FastHiveDecimal; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal first_value() for a PTF group. + * + * We capture the first value from the first batch. It can be NULL. + * We then set (stream) the output column with that value as repeated in each batch. + */ +public class VectorPTFEvaluatorDecimalFirstValue extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDecimalFirstValue.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean haveFirstValue; + protected boolean isGroupResultNull; + protected HiveDecimalWritable firstValue; + + public VectorPTFEvaluatorDecimalFirstValue(WindowFrameDef windowFrameDef, + VectorExpression inputVecExpr, int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + firstValue = new HiveDecimalWritable(); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // First row determines isGroupResultNull and decimal firstValue; stream fill result as repeated. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + if (!haveFirstValue) { + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + if (decimalColVector.isRepeating) { + if (decimalColVector.noNulls) { + firstValue.set(decimalColVector.vector[0]); + isGroupResultNull = false; + } + } else if (decimalColVector.noNulls) { + firstValue.set(decimalColVector.vector[0]); + isGroupResultNull = false; + } else { + if (!decimalColVector.isNull[0]) { + firstValue.set(decimalColVector.vector[0]); + isGroupResultNull = false; + } + } + haveFirstValue = true; + } + + // First value is repeated for all batches. + DecimalColumnVector outputColVector = (DecimalColumnVector) batch.cols[outputColumnNum]; + outputColVector.isRepeating = true; + if (isGroupResultNull) { + outputColVector.noNulls = false; + outputColVector.isNull[0] = true; + } else { + outputColVector.noNulls = true; + outputColVector.isNull[0] = false; + outputColVector.vector[0].set(firstValue); + } + } + + public boolean streamsResult() { + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public void resetEvaluator() { + haveFirstValue = false; + isGroupResultNull = true; + firstValue.set(HiveDecimal.ZERO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java new file mode 100644 index 0000000..8a50476 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalLastValue.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.FastHiveDecimal; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal last_value() for a PTF group. + * + * We capture the last value from the last batch. It can be NULL. + * It becomes the group value. + */ +public class VectorPTFEvaluatorDecimalLastValue extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDecimalLastValue.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean isGroupResultNull; + protected HiveDecimalWritable lastValue; + + public VectorPTFEvaluatorDecimalLastValue(WindowFrameDef windowFrameDef, + VectorExpression inputVecExpr, int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + lastValue = new HiveDecimalWritable(); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Last row of last batch determines isGroupResultNull and decimal lastValue. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + if (!isLastGroupBatch) { + return; + } + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + if (decimalColVector.isRepeating) { + if (decimalColVector.noNulls) { + lastValue.set(decimalColVector.vector[0]); + isGroupResultNull = false; + } else { + isGroupResultNull = true; + } + } else if (decimalColVector.noNulls) { + lastValue.set(decimalColVector.vector[size - 1]); + isGroupResultNull = false; + } else { + final int lastBatchIndex = size - 1; + if (!decimalColVector.isNull[lastBatchIndex]) { + lastValue.set(decimalColVector.vector[lastBatchIndex]); + isGroupResultNull = false; + } else { + isGroupResultNull = true; + } + } + } + + @Override + public boolean isGroupResultNull() { + return isGroupResultNull; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public HiveDecimalWritable getDecimalGroupResult() { + return lastValue; + } + + @Override + public void resetEvaluator() { + isGroupResultNull = true; + lastValue.set(HiveDecimal.ZERO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java new file mode 100644 index 0000000..3c59268 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMax.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.FastHiveDecimal; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal max() for a PTF group. + */ +public class VectorPTFEvaluatorDecimalMax extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDecimalMax.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean isGroupResultNull; + protected HiveDecimalWritable max; + + public VectorPTFEvaluatorDecimalMax(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + max = new HiveDecimalWritable(); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Determine maximum of all non-null decimal column values; maintain isGroupResultNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + if (decimalColVector.isRepeating) { + if (decimalColVector.noNulls) { + if (isGroupResultNull) { + max.set(decimalColVector.vector[0]); + isGroupResultNull = false; + } else { + HiveDecimalWritable repeatedMax = decimalColVector.vector[0]; + if (repeatedMax.compareTo(max) == 1) { + max.set(repeatedMax); + } + } + } + } else if (decimalColVector.noNulls) { + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + max.set(vector[0]); + isGroupResultNull = false; + } else { + final HiveDecimalWritable dec = vector[0]; + if (dec.compareTo(max) == 1) { + max.set(dec); + } + } + for (int i = 1; i < size; i++) { + final HiveDecimalWritable dec = vector[i]; + if (dec.compareTo(max) == 1) { + max.set(dec); + } + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (++i >= size) { + return; + } + } + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + max.set(vector[i++]); + isGroupResultNull = false; + } else { + final HiveDecimalWritable dec = vector[i++]; + if (dec.compareTo(max) == 1) { + max.set(dec); + } + } + for (; i < size; i++) { + if (!batchIsNull[i]) { + final HiveDecimalWritable dec = vector[i]; + if (dec.compareTo(max) == 1) { + max.set(dec); + } + } + } + } + } + + @Override + public boolean isGroupResultNull() { + return isGroupResultNull; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public HiveDecimalWritable getDecimalGroupResult() { + return max; + } + + private static HiveDecimal MIN_VALUE = HiveDecimal.create("-99999999999999999999999999999999999999"); + + @Override + public void resetEvaluator() { + isGroupResultNull = true; + max.set(MIN_VALUE); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java new file mode 100644 index 0000000..0f7ea04 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalMin.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.FastHiveDecimal; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal min() for a PTF group. + */ +public class VectorPTFEvaluatorDecimalMin extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDecimalMin.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean isGroupResultNull; + protected HiveDecimalWritable min; + + public VectorPTFEvaluatorDecimalMin(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + min = new HiveDecimalWritable(); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Determine minimum of all non-null decimal column values; maintain isGroupResultNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + if (decimalColVector.isRepeating) { + if (decimalColVector.noNulls) { + if (isGroupResultNull) { + min.set(decimalColVector.vector[0]); + isGroupResultNull = false; + } else { + HiveDecimalWritable repeatedMin = decimalColVector.vector[0]; + if (repeatedMin.compareTo(min) == -1) { + min.set(repeatedMin); + } + } + } + } else if (decimalColVector.noNulls) { + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + min.set(vector[0]); + isGroupResultNull = false; + } else { + final HiveDecimalWritable dec = vector[0]; + if (dec.compareTo(min) == -1) { + min.set(dec); + } + } + for (int i = 1; i < size; i++) { + final HiveDecimalWritable dec = vector[i]; + if (dec.compareTo(min) == -1) { + min.set(dec); + } + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (++i >= size) { + return; + } + } + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + min.set(vector[i++]); + isGroupResultNull = false; + } else { + final HiveDecimalWritable dec = vector[i++]; + if (dec.compareTo(min) == -1) { + min.set(dec); + } + } + for (; i < size; i++) { + if (!batchIsNull[i]) { + final HiveDecimalWritable dec = vector[i]; + if (dec.compareTo(min) == -1) { + min.set(dec); + } + } + } + } + } + + @Override + public boolean isGroupResultNull() { + return isGroupResultNull; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public HiveDecimalWritable getDecimalGroupResult() { + return min; + } + + private static HiveDecimal MAX_VALUE = HiveDecimal.create("99999999999999999999999999999999999999"); + + @Override + public void resetEvaluator() { + isGroupResultNull = true; + min.set(MAX_VALUE); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java new file mode 100644 index 0000000..8300781 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDecimalSum.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates HiveDecimal sum() for a PTF group. + */ +public class VectorPTFEvaluatorDecimalSum extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDecimalSum.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean isGroupResultNull; + protected HiveDecimalWritable sum; + protected HiveDecimalWritable temp; + + public VectorPTFEvaluatorDecimalSum(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + sum = new HiveDecimalWritable(); + temp = new HiveDecimalWritable(); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Sum all non-null decimal column values; maintain isGroupResultNull. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DecimalColumnVector decimalColVector = ((DecimalColumnVector) batch.cols[inputColumnNum]); + if (decimalColVector.isRepeating) { + + if (decimalColVector.noNulls) { + temp.setFromLong(batch.size); + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum.set(decimalColVector.vector[0]); + sum.mutateMultiply(temp); + isGroupResultNull = false; + } else { + temp.mutateMultiply(decimalColVector.vector[0]); + sum.mutateAdd(temp); + } + } + } else if (decimalColVector.noNulls) { + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum.set(vector[0]); + isGroupResultNull = false; + } else { + sum.mutateAdd(vector[0]); + } + for (int i = 1; i < size; i++) { + sum.mutateAdd(vector[i]); + } + } else { + boolean[] batchIsNull = decimalColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (++i >= size) { + return; + } + } + HiveDecimalWritable[] vector = decimalColVector.vector; + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum.set(vector[i++]); + isGroupResultNull = false; + } else { + sum.mutateAdd(vector[i++]); + } + for (; i < size; i++) { + if (!batchIsNull[i]) { + sum.mutateAdd(vector[i]); + } + } + } + } + + @Override + public boolean isGroupResultNull() { + return isGroupResultNull; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DECIMAL; + } + + @Override + public HiveDecimalWritable getDecimalGroupResult() { + return sum; + } + + @Override + public void resetEvaluator() { + isGroupResultNull = true; + sum.set(HiveDecimal.ZERO);; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java new file mode 100644 index 0000000..62f7aa5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDenseRank.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +/** + * This class evaluates rank() for a PTF group. + * + * Dense rank starts at 1; the same dense rank is streamed to the output column as repeated; after + * the last group row, the dense rank incremented by 1. + */ +public class VectorPTFEvaluatorDenseRank extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDenseRank.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + private int denseRank; + + public VectorPTFEvaluatorDenseRank(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + LongColumnVector longColVector = (LongColumnVector) batch.cols[outputColumnNum]; + longColVector.isRepeating = true; + longColVector.noNulls = true; + longColVector.isNull[0] = false; + longColVector.vector[0] = denseRank; + + if (isLastGroupBatch) { + denseRank++; + } + } + + public boolean streamsResult() { + // No group value. + return true; + } + + @Override + public Type getResultColumnVectorType() { + return Type.LONG; + } + + @Override + public void resetEvaluator() { + denseRank = 1; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/a0df0ace/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java new file mode 100644 index 0000000..2c379d7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFEvaluatorDoubleAvg.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.ptf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; + +import com.google.common.base.Preconditions; + +/** + * This class evaluates double avg() for a PTF group. + * + * Sum up non-null column values; group result is sum / non-null count. + */ +public class VectorPTFEvaluatorDoubleAvg extends VectorPTFEvaluatorBase { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorPTFEvaluatorDoubleAvg.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected boolean isGroupResultNull; + protected double sum; + private int nonNullGroupCount; + private double avg; + + public VectorPTFEvaluatorDoubleAvg(WindowFrameDef windowFrameDef, VectorExpression inputVecExpr, + int outputColumnNum) { + super(windowFrameDef, inputVecExpr, outputColumnNum); + resetEvaluator(); + } + + public void evaluateGroupBatch(VectorizedRowBatch batch, boolean isLastGroupBatch) { + evaluateInputExpr(batch); + + // Sum all non-null double column values for avg; maintain isGroupResultNull; after last row of + // last group batch compute the group avg when sum is non-null. + + // We do not filter when PTF is in reducer. + Preconditions.checkState(!batch.selectedInUse); + + final int size = batch.size; + if (size == 0) { + return; + } + DoubleColumnVector doubleColVector = ((DoubleColumnVector) batch.cols[inputColumnNum]); + if (doubleColVector.isRepeating) { + + if (doubleColVector.noNulls) { + + // We have a repeated value. The sum increases by value * batch.size. + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum = doubleColVector.vector[0] * batch.size; + isGroupResultNull = false; + } else { + sum += doubleColVector.vector[0] * batch.size; + } + nonNullGroupCount += size; + } + } else if (doubleColVector.noNulls) { + double[] vector = doubleColVector.vector; + double varSum = vector[0]; + for (int i = 1; i < size; i++) { + varSum += vector[i]; + } + nonNullGroupCount += size; + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum = varSum; + isGroupResultNull = false; + } else { + sum += varSum; + } + } else { + boolean[] batchIsNull = doubleColVector.isNull; + int i = 0; + while (batchIsNull[i]) { + if (++i >= size) { + return; + } + } + double[] vector = doubleColVector.vector; + double varSum = vector[i++]; + nonNullGroupCount++; + for (; i < size; i++) { + if (!batchIsNull[i]) { + varSum += vector[i]; + nonNullGroupCount++; + } + } + if (isGroupResultNull) { + + // First aggregation calculation for group. + sum = varSum; + isGroupResultNull = false; + } else { + sum += varSum; + } + } + + if (isLastGroupBatch) { + if (!isGroupResultNull) { + avg = sum / nonNullGroupCount; + } + } + } + + @Override + public boolean isGroupResultNull() { + return isGroupResultNull; + } + + @Override + public Type getResultColumnVectorType() { + return Type.DOUBLE; + } + + @Override + public double getDoubleGroupResult() { + return avg; + } + + @Override + public void resetEvaluator() { + isGroupResultNull = true; + sum = 0.0; + nonNullGroupCount = 0; + avg = 0.0; + } +} \ No newline at end of file