Repository: hive Updated Branches: refs/heads/master e1e68b29a -> 15bdce43d
HIVE-13453: Support ORDER BY and windowing clause in partitioning clause with distinct function (Reviewed by Yongzhi Chen) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/15bdce43 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/15bdce43 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/15bdce43 Branch: refs/heads/master Commit: 15bdce43db4624a63be1f648e46d1f2baa1c67de Parents: e1e68b2 Author: Aihua Xu <aihu...@apache.org> Authored: Fri May 6 11:00:20 2016 -0400 Committer: Aihua Xu <aihu...@apache.org> Committed: Sat May 28 20:36:59 2016 -0400 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/FunctionRegistry.java | 2 +- .../apache/hadoop/hive/ql/exec/Registry.java | 8 +- .../hadoop/hive/ql/parse/WindowingSpec.java | 14 -- .../hive/ql/plan/ptf/WindowFunctionDef.java | 2 +- .../hive/ql/udf/generic/GenericUDAFAverage.java | 68 ++++++++-- .../hive/ql/udf/generic/GenericUDAFCount.java | 57 +++++--- .../udf/generic/GenericUDAFParameterInfo.java | 7 + .../hive/ql/udf/generic/GenericUDAFSum.java | 134 +++++++++++++------ .../generic/SimpleGenericUDAFParameterInfo.java | 9 +- .../hive/ql/udf/ptf/WindowingTableFunction.java | 9 +- .../queries/clientpositive/windowing_distinct.q | 18 +++ .../clientpositive/windowing_distinct.q.out | 66 +++++++++ .../objectinspector/ObjectInspectorUtils.java | 38 ++++++ 13 files changed, 333 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index fa90242..8217ad3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -902,7 +902,7 @@ public final class FunctionRegistry { GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo( - args, isDistinct, isAllColumns); + args, false, isDistinct, isAllColumns); GenericUDAFEvaluator udafEvaluator; if (udafResolver instanceof GenericUDAFResolver2) { http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java index 891514b..86df74d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java @@ -395,7 +395,7 @@ public class Registry { */ @SuppressWarnings("deprecation") public GenericUDAFEvaluator getGenericUDAFEvaluator(String name, - List<ObjectInspector> argumentOIs, boolean isDistinct, + List<ObjectInspector> argumentOIs, boolean isWindowing, boolean isDistinct, boolean isAllColumns) throws SemanticException { GenericUDAFResolver udafResolver = getGenericUDAFResolver(name); @@ -413,7 +413,7 @@ public class Registry { GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo( - args, isDistinct, isAllColumns); + args, isWindowing, isDistinct, isAllColumns); if (udafResolver instanceof GenericUDAFResolver2) { udafEvaluator = ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo); @@ -433,14 +433,14 @@ public class Registry { } if (!functionName.equals(FunctionRegistry.LEAD_FUNC_NAME) && !functionName.equals(FunctionRegistry.LAG_FUNC_NAME)) { - return getGenericUDAFEvaluator(functionName, argumentOIs, isDistinct, isAllColumns); + return getGenericUDAFEvaluator(functionName, argumentOIs, true, isDistinct, isAllColumns); } // this must be lead/lag UDAF ObjectInspector args[] = new ObjectInspector[argumentOIs.size()]; GenericUDAFResolver udafResolver = info.getGenericUDAFResolver(); GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo( - argumentOIs.toArray(args), isDistinct, isAllColumns); + argumentOIs.toArray(args), true, isDistinct, isAllColumns); return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo); } http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java index 5ce7200..ef5186a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java @@ -124,9 +124,6 @@ public class WindowingSpec { WindowFunctionSpec wFn = (WindowFunctionSpec) expr; WindowSpec wdwSpec = wFn.getWindowSpec(); - // 0. Precheck supported syntax - precheckSyntax(wFn, wdwSpec); - // 1. For Wdw Specs that refer to Window Defns, inherit missing components if ( wdwSpec != null ) { ArrayList<String> sources = new ArrayList<String>(); @@ -153,14 +150,6 @@ public class WindowingSpec { } } - private void precheckSyntax(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException { - if (wdwSpec != null ) { - if (wFn.isDistinct && (wdwSpec.windowFrame != null || wdwSpec.getOrder() != null) ) { - throw new SemanticException("Function with DISTINCT cannot work with partition ORDER BY or windowing clause."); - } - } - } - private void fillInWindowSpec(String sourceId, WindowSpec dest, ArrayList<String> visited) throws SemanticException { @@ -509,9 +498,6 @@ public class WindowingSpec { if ( getOrder() == null ) { OrderSpec order = new OrderSpec(); order.prefixBy(getPartition()); - if (wFn.isDistinct) { - order.addExpressions(wFn.getArgs()); - } setOrder(order); } } http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java index ed6c671..84ac614 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java @@ -124,4 +124,4 @@ public class WindowFunctionDef extends WindowExpressionDef { this.pivotResult = pivotResult; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index 3c1ce26..6799978 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.util.ArrayList; +import java.util.HashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; @@ -106,6 +108,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { AbstractGenericUDAFAverageEvaluator eval = (AbstractGenericUDAFAverageEvaluator) getEvaluator(paramInfo.getParameters()); eval.avgDistinct = paramInfo.isDistinct(); + eval.isWindowing = paramInfo.isWindowing(); return eval; } @@ -115,7 +118,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { public void doReset(AverageAggregationBuffer<Double> aggregation) throws HiveException { aggregation.count = 0; aggregation.sum = new Double(0); - aggregation.previousValue = null; + aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>(); } @Override @@ -145,6 +148,12 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } @Override + protected void doMergeAdd(Double sum, + ObjectInspectorObject obj) { + sum += PrimitiveObjectInspectorUtils.getDouble(obj.getValues()[0], copiedOI); + } + + @Override protected void doTerminatePartial(AverageAggregationBuffer<Double> aggregation) { if(partialResult[1] == null) { partialResult[1] = new DoubleWritable(0); @@ -172,6 +181,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { + // Don't use streaming for distinct cases + if (isWindowingDistinct()) { + return null; + } return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>(this, wFrameDef) { @@ -212,6 +225,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { public void doReset(AverageAggregationBuffer<HiveDecimal> aggregation) throws HiveException { aggregation.count = 0; aggregation.sum = HiveDecimal.ZERO; + aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>(); } @Override @@ -263,6 +277,14 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } } + + @Override + protected void doMergeAdd( + HiveDecimal sum, + ObjectInspectorObject obj) { + sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(obj.getValues()[0], copiedOI)); + } + @Override protected void doTerminatePartial(AverageAggregationBuffer<HiveDecimal> aggregation) { if(partialResult[1] == null && aggregation.sum != null) { @@ -296,6 +318,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { + // Don't use streaming for distinct cases + if (isWindowingDistinct()) { + return null; + } return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>( this, wFrameDef) { @@ -333,18 +359,18 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer { - private Object previousValue; + private HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows. private long count; private TYPE sum; }; @SuppressWarnings("unchecked") public static abstract class AbstractGenericUDAFAverageEvaluator<TYPE> extends GenericUDAFEvaluator { + protected boolean isWindowing; protected boolean avgDistinct; - // For PARTIAL1 and COMPLETE protected transient PrimitiveObjectInspector inputOI; - protected transient ObjectInspector copiedOI; + protected transient PrimitiveObjectInspector copiedOI; // For PARTIAL2 and FINAL private transient StructObjectInspector soi; private transient StructField countField; @@ -363,6 +389,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { PrimitiveObjectInspector inputOI, Object parameter); protected abstract void doMerge(AverageAggregationBuffer<TYPE> aggregation, Long partialCount, ObjectInspector sumFieldOI, Object partialSum); + protected abstract void doMergeAdd(TYPE sum, ObjectInspectorObject obj); protected abstract void doTerminatePartial(AverageAggregationBuffer<TYPE> aggregation); protected abstract Object doTerminate(AverageAggregationBuffer<TYPE> aggregation); protected abstract void doReset(AverageAggregationBuffer<TYPE> aggregation) throws HiveException; @@ -376,7 +403,7 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { // init input if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) { inputOI = (PrimitiveObjectInspector) parameters[0]; - copiedOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI, + copiedOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI, ObjectInspectorCopyOption.JAVA); } else { soi = (StructObjectInspector) parameters[0]; @@ -410,6 +437,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } } + protected boolean isWindowingDistinct() { + return isWindowing && avgDistinct; + } + @AggregationType(estimable = true) static class AverageAgg extends AbstractAggregationBuffer { long count; @@ -432,12 +463,15 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { AverageAggregationBuffer<TYPE> averageAggregation = (AverageAggregationBuffer<TYPE>) aggregation; try { // Skip the same value if avgDistinct is true - if (this.avgDistinct && - ObjectInspectorUtils.compare(parameter, inputOI, averageAggregation.previousValue, copiedOI) == 0) { - return; + if (isWindowingDistinct()) { + ObjectInspectorObject obj = new ObjectInspectorObject( + ObjectInspectorUtils.copyToStandardObject(parameter, inputOI, ObjectInspectorCopyOption.JAVA), + copiedOI); + if (averageAggregation.uniqueObjects.contains(obj)) { + return; + } + averageAggregation.uniqueObjects.add(obj); } - averageAggregation.previousValue = ObjectInspectorUtils.copyToStandardObject( - parameter, inputOI, ObjectInspectorCopyOption.JAVA); doIterate(averageAggregation, inputOI, parameter); } catch (NumberFormatException e) { @@ -451,6 +485,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { @Override public Object terminatePartial(AggregationBuffer aggregation) throws HiveException { + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } + doTerminatePartial((AverageAggregationBuffer<TYPE>) aggregation); return partialResult; } @@ -459,9 +497,13 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { public void merge(AggregationBuffer aggregation, Object partial) throws HiveException { if (partial != null) { - doMerge((AverageAggregationBuffer<TYPE>)aggregation, - countFieldOI.get(soi.getStructFieldData(partial, countField)), - sumFieldOI, soi.getStructFieldData(partial, sumField)); + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + doMerge((AverageAggregationBuffer<TYPE>)aggregation, + countFieldOI.get(soi.getStructFieldData(partial, countField)), + sumFieldOI, soi.getStructFieldData(partial, sumField)); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java index 2825045..d1d0131 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.HashSet; + import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -25,6 +27,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -70,6 +73,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { } GenericUDAFCountEvaluator countEvaluator = new GenericUDAFCountEvaluator(); + countEvaluator.setWindowing(paramInfo.isWindowing()); countEvaluator.setCountAllColumns(paramInfo.isAllColumns()); countEvaluator.setCountDistinct(paramInfo.isDistinct()); @@ -81,6 +85,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { * */ public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator { + private boolean isWindowing = false; private boolean countAllColumns = false; private boolean countDistinct = false; private LongObjectInspector partialCountAggOI; @@ -99,9 +104,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { ObjectInspectorCopyOption.JAVA); } result = new LongWritable(0); + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } + public void setWindowing(boolean isWindowing) { + this.isWindowing = isWindowing; + } + private void setCountAllColumns(boolean countAllCols) { countAllColumns = countAllCols; } @@ -110,10 +120,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { this.countDistinct = countDistinct; } + private boolean isWindowingDistinct() { + return isWindowing && countDistinct; + } + /** class for storing count value. */ @AggregationType(estimable = true) static class CountAgg extends AbstractAggregationBuffer { - Object[] prevColumns = null; // Column values from previous row. Used to compare with current row for the case of COUNT(DISTINCT). + HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows long value; @Override public int estimate() { return JavaDataModel.PRIMITIVES2; } @@ -128,8 +142,8 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { @Override public void reset(AggregationBuffer agg) throws HiveException { - ((CountAgg) agg).prevColumns = null; ((CountAgg) agg).value = 0; + ((CountAgg) agg).uniqueObjects = new HashSet<ObjectInspectorObject>(); } @Override @@ -151,19 +165,16 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { } } - // Skip the counting if the values are the same for COUNT(DISTINCT) case - if (countThisRow && countDistinct) { - Object[] prevColumns = ((CountAgg) agg).prevColumns; - if (prevColumns == null) { - ((CountAgg) agg).prevColumns = new Object[parameters.length]; - } else if (ObjectInspectorUtils.compare(parameters, inputOI, prevColumns, outputOI) == 0) { - countThisRow = false; - } - - // We need to keep a copy of values from previous row. - if (countThisRow) { - ((CountAgg) agg).prevColumns = ObjectInspectorUtils.copyToStandardObject( - parameters, inputOI, ObjectInspectorCopyOption.JAVA); + // Skip the counting if the values are the same for windowing COUNT(DISTINCT) case + if (countThisRow && isWindowingDistinct()) { + HashSet<ObjectInspectorObject> uniqueObjs = ((CountAgg) agg).uniqueObjects; + ObjectInspectorObject obj = new ObjectInspectorObject( + ObjectInspectorUtils.copyToStandardObject(parameters, inputOI, ObjectInspectorCopyOption.JAVA), + outputOI); + if (!uniqueObjs.contains(obj)) { + uniqueObjs.add(obj); + } else { + countThisRow = false; } } @@ -177,8 +188,14 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { - long p = partialCountAggOI.get(partial); - ((CountAgg) agg).value += p; + CountAgg countAgg = (CountAgg) agg; + + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + long p = partialCountAggOI.get(partial); + countAgg.value += p; + } } } @@ -190,7 +207,11 @@ public class GenericUDAFCount implements GenericUDAFResolver2 { @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { - return terminate(agg); + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + return terminate(agg); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java index 6a62d7c..675d9f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java @@ -67,6 +67,13 @@ public interface GenericUDAFParameterInfo { boolean isDistinct(); /** + * The flag to indicate if the UDAF invocation was from the windowing function + * call or not. + * @return <tt>true</tt> if the UDAF invocation was from the windowing function + * call. + */ + boolean isWindowing(); + /** * Returns <tt>true</tt> if the UDAF invocation was done via the wildcard * syntax <tt>FUNCTION(*)</tt>. Note that this is provided for informational * purposes only and the function implementation is not expected to ensure http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index 7b1d6e5..f53554c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.HashSet; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -32,6 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; @@ -39,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; /** @@ -93,6 +97,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { TypeInfo[] parameters = info.getParameters(); GenericUDAFSumEvaluator eval = (GenericUDAFSumEvaluator) getEvaluator(parameters); + eval.setWindowing(info.isWindowing()); eval.setSumDistinct(info.isDistinct()); return eval; @@ -125,44 +130,69 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { * The base type for sum operator evaluator * */ - public static abstract class GenericUDAFSumEvaluator<ResultType> extends GenericUDAFEvaluator { + public static abstract class GenericUDAFSumEvaluator<ResultType extends Writable> extends GenericUDAFEvaluator { static abstract class SumAgg<T> extends AbstractAggregationBuffer { boolean empty; T sum; - Object previousValue = null; + HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows. } protected PrimitiveObjectInspector inputOI; - protected ObjectInspector outputOI; + protected PrimitiveObjectInspector outputOI; protected ResultType result; + protected boolean isWindowing; protected boolean sumDistinct; - public boolean sumDistinct() { - return sumDistinct; + public void setWindowing(boolean isWindowing) { + this.isWindowing = isWindowing; } public void setSumDistinct(boolean sumDistinct) { this.sumDistinct = sumDistinct; } + protected boolean isWindowingDistinct() { + return isWindowing && sumDistinct; + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + return terminate(agg); + } + } + /** - * Check if the input object is the same as the previous one for the case of - * SUM(DISTINCT). + * Check if the input object is eligible to contribute to the sum. If it's null + * or the same value as the previous one for the case of SUM(DISTINCT). Then + * skip it. * @param input the input object - * @return True if sumDistinct is false or the input is different from the previous object + * @return True if sumDistinct is false or the non-null input is different from the previous object */ - protected boolean checkDistinct(SumAgg agg, Object input) { - if (this.sumDistinct && - ObjectInspectorUtils.compare(input, inputOI, agg.previousValue, outputOI) == 0) { + protected boolean isEligibleValue(SumAgg agg, Object input) { + if (input == null) { return false; } - agg.previousValue = ObjectInspectorUtils.copyToStandardObject( - input, inputOI, ObjectInspectorCopyOption.JAVA); - return true; - } + if (isWindowingDistinct()) { + HashSet<ObjectInspectorObject> uniqueObjs = agg.uniqueObjects; + ObjectInspectorObject obj = input instanceof ObjectInspectorObject ? + (ObjectInspectorObject)input : + new ObjectInspectorObject( + ObjectInspectorUtils.copyToStandardObject(input, inputOI, ObjectInspectorCopyOption.JAVA), + outputOI); + if (!uniqueObjs.contains(obj)) { + uniqueObjs.add(obj); + return true; + } + return false; + } + return true; + } } /** @@ -177,7 +207,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { super.init(m, parameters); result = new HiveDecimalWritable(HiveDecimal.ZERO); inputOI = (PrimitiveObjectInspector) parameters[0]; - outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI, + outputOI = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(inputOI, ObjectInspectorCopyOption.JAVA); // The output precision is 10 greater than the input which should cover at least // 10b rows. The scale is the same as the input. @@ -208,6 +238,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { SumAgg<HiveDecimal> bdAgg = (SumAgg<HiveDecimal>) agg; bdAgg.empty = true; bdAgg.sum = HiveDecimal.ZERO; + bdAgg.uniqueObjects = new HashSet<ObjectInspectorObject>(); } boolean warned = false; @@ -216,8 +247,10 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert (parameters.length == 1); try { - if (checkDistinct((SumAgg) agg, parameters[0])) { - merge(agg, parameters[0]); + if (isEligibleValue((SumHiveDecimalAgg) agg, parameters[0])) { + ((SumHiveDecimalAgg)agg).empty = false; + ((SumHiveDecimalAgg)agg).sum = ((SumHiveDecimalAgg)agg).sum.add( + PrimitiveObjectInspectorUtils.getHiveDecimal(parameters[0], inputOI)); } } catch (NumberFormatException e) { if (!warned) { @@ -232,11 +265,6 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } @Override - public Object terminatePartial(AggregationBuffer agg) throws HiveException { - return terminate(agg); - } - - @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) agg; @@ -245,7 +273,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } myagg.empty = false; - myagg.sum = myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI)); + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + myagg.sum = myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI)); + } } } @@ -261,6 +293,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { + // Don't use streaming for distinct cases + if (sumDistinct) { + return null; + } + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>( this, wFrameDef) { @@ -301,7 +338,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { super.init(m, parameters); result = new DoubleWritable(0); inputOI = (PrimitiveObjectInspector) parameters[0]; - outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI, + outputOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI, ObjectInspectorCopyOption.JAVA); return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; } @@ -325,6 +362,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { SumDoubleAgg myagg = (SumDoubleAgg) agg; myagg.empty = true; myagg.sum = 0.0; + myagg.uniqueObjects = new HashSet<ObjectInspectorObject>(); } boolean warned = false; @@ -333,8 +371,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert (parameters.length == 1); try { - if (checkDistinct((SumAgg) agg, parameters[0])) { - merge(agg, parameters[0]); + if (isEligibleValue((SumDoubleAgg) agg, parameters[0])) { + ((SumDoubleAgg)agg).empty = false; + ((SumDoubleAgg)agg).sum += PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI); } } catch (NumberFormatException e) { if (!warned) { @@ -349,16 +388,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } @Override - public Object terminatePartial(AggregationBuffer agg) throws HiveException { - return terminate(agg); - } - - @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { SumDoubleAgg myagg = (SumDoubleAgg) agg; myagg.empty = false; - myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI); + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI); + } } } @@ -374,6 +412,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { + // Don't use streaming for distinct cases + if (sumDistinct) { + return null; + } + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>(this, wFrameDef) { @@ -415,7 +458,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { super.init(m, parameters); result = new LongWritable(0); inputOI = (PrimitiveObjectInspector) parameters[0]; - outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI, + outputOI = (PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI, ObjectInspectorCopyOption.JAVA); return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } @@ -439,6 +482,7 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { SumLongAgg myagg = (SumLongAgg) agg; myagg.empty = true; myagg.sum = 0L; + myagg.uniqueObjects = new HashSet<ObjectInspectorObject>(); } private boolean warned = false; @@ -447,8 +491,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert (parameters.length == 1); try { - if (checkDistinct((SumAgg) agg, parameters[0])) { - merge(agg, parameters[0]); + if (isEligibleValue((SumLongAgg) agg, parameters[0])) { + ((SumLongAgg)agg).empty = false; + ((SumLongAgg)agg).sum += PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI); } } catch (NumberFormatException e) { if (!warned) { @@ -460,16 +505,15 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } @Override - public Object terminatePartial(AggregationBuffer agg) throws HiveException { - return terminate(agg); - } - - @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { SumLongAgg myagg = (SumLongAgg) agg; - myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI); myagg.empty = false; + if (isWindowingDistinct()) { + throw new HiveException("Distinct windowing UDAF doesn't support merge and terminatePartial"); + } else { + myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI); + } } } @@ -485,6 +529,11 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { + // Don't use streaming for distinct cases + if (isWindowingDistinct()) { + return null; + } + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>(this, wFrameDef) { @@ -509,7 +558,6 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; return myagg.empty ? null : new Long(myagg.sum); } - }; } } http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java index 1a1b570..728964d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java @@ -29,12 +29,14 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo { private final ObjectInspector[] parameters; + private final boolean isWindowing; private final boolean distinct; private final boolean allColumns; - public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean distinct, + public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) { this.parameters = params; + this.isWindowing = isWindowing; this.distinct = distinct; this.allColumns = allColumns; } @@ -63,4 +65,9 @@ public class SimpleGenericUDAFParameterInfo implements GenericUDAFParameterInfo public boolean isAllColumns() { return allColumns; } + + @Override + public boolean isWindowing() { + return isWindowing; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 858b47a..b89c14e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer; import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -392,8 +393,6 @@ public class WindowingTableFunction extends TableFunctionEvaluator { } streamingState.rollingPart.append(row); - row = streamingState.rollingPart - .getAt(streamingState.rollingPart.size() - 1); WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); @@ -408,7 +407,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator { } } - if (fnEval instanceof ISupportStreamingModeForWindowing) { + if (fnEval != null && + fnEval instanceof ISupportStreamingModeForWindowing) { fnEval.aggregate(streamingState.aggBuffers[i], streamingState.funcArgs[i]); Object out = ((ISupportStreamingModeForWindowing) fnEval) .getNextResult(streamingState.aggBuffers[i]); @@ -472,7 +472,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator { GenericUDAFEvaluator fnEval = wFn.getWFnEval(); int numRowsRemaining = wFn.getWindowFrame().getEnd().getRelativeOffset(); - if (fnEval instanceof ISupportStreamingModeForWindowing) { + if (fnEval != null && + fnEval instanceof ISupportStreamingModeForWindowing) { fnEval.terminate(streamingState.aggBuffers[i]); WindowingFunctionInfoHelper wFnInfo = getWindowingFunctionInfoHelper(wFn.getName()); http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/queries/clientpositive/windowing_distinct.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q b/ql/src/test/queries/clientpositive/windowing_distinct.q index bb192a7..6b49978 100644 --- a/ql/src/test/queries/clientpositive/windowing_distinct.q +++ b/ql/src/test/queries/clientpositive/windowing_distinct.q @@ -44,3 +44,21 @@ SELECT AVG(DISTINCT t) OVER (PARTITION BY index), AVG(DISTINCT ts) OVER (PARTITION BY index), AVG(DISTINCT dec) OVER (PARTITION BY index) FROM windowing_distinct; + +-- count +select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + count(distinct f) over (partition by index order by f rows between 1 following and 2 following), + count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct; + +-- sum +select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + sum(distinct f) over (partition by index order by f rows between 1 following and 2 following), + sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct; + +-- avg +select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + avg(distinct f) over (partition by index order by f rows between 1 following and 2 following), + avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct; http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/results/clientpositive/windowing_distinct.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out b/ql/src/test/results/clientpositive/windowing_distinct.q.out index 074a594..86d1cdd 100644 --- a/ql/src/test/results/clientpositive/windowing_distinct.q.out +++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out @@ -128,3 +128,69 @@ POSTHOOK: Input: default@windowing_distinct 117.5 38.71 NULL NULL 1.362157918703306E9 34.5000 117.5 38.71 NULL NULL 1.362157918703306E9 34.5000 117.5 38.71 NULL NULL 1.362157918703306E9 34.5000 +PREHOOK: query: -- count +select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + count(distinct f) over (partition by index order by f rows between 1 following and 2 following), + count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct +PREHOOK: type: QUERY +PREHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +POSTHOOK: query: -- count +select index, f, count(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + count(distinct f) over (partition by index order by f rows between 1 following and 2 following), + count(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct +POSTHOOK: type: QUERY +POSTHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +1 26.43 0 0 2 1 +1 26.43 1 1 1 2 +1 96.91 1 1 0 2 +2 13.01 0 0 1 2 +2 74.72 1 1 1 2 +2 74.72 2 2 0 2 +PREHOOK: query: -- sum +select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + sum(distinct f) over (partition by index order by f rows between 1 following and 2 following), + sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct +PREHOOK: type: QUERY +PREHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +POSTHOOK: query: -- sum +select index, f, sum(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + sum(distinct f) over (partition by index order by f rows between 1 following and 2 following), + sum(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct +POSTHOOK: type: QUERY +POSTHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +1 26.43 NULL NULL 123.34000396728516 26.43000030517578 +1 26.43 26.43000030517578 26.43000030517578 96.91000366210938 123.34000396728516 +1 96.91 26.43000030517578 26.43000030517578 NULL 123.34000396728516 +2 13.01 NULL NULL 74.72000122070312 87.73000144958496 +2 74.72 13.010000228881836 13.010000228881836 74.72000122070312 87.73000144958496 +2 74.72 87.73000144958496 87.73000144958496 NULL 87.73000144958496 +PREHOOK: query: -- avg +select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + avg(distinct f) over (partition by index order by f rows between 1 following and 2 following), + avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct +PREHOOK: type: QUERY +PREHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +POSTHOOK: query: -- avg +select index, f, avg(distinct f) over (partition by index order by f rows between 2 preceding and 1 preceding), + avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 preceding), + avg(distinct f) over (partition by index order by f rows between 1 following and 2 following), + avg(distinct f) over (partition by index order by f rows between unbounded preceding and 1 following) from windowing_distinct +POSTHOOK: type: QUERY +POSTHOOK: Input: default@windowing_distinct +#### A masked pattern was here #### +1 26.43 NULL NULL 61.67000198364258 26.43000030517578 +1 26.43 26.43000030517578 26.43000030517578 96.91000366210938 61.67000198364258 +1 96.91 26.43000030517578 26.43000030517578 NULL 61.67000198364258 +2 13.01 NULL NULL 74.72000122070312 43.86500072479248 +2 74.72 13.010000228881836 13.010000228881836 74.72000122070312 43.86500072479248 +2 74.72 43.86500072479248 43.86500072479248 NULL 43.86500072479248 http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index c58e8ed..1ac72c6 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -117,6 +117,44 @@ public final class ObjectInspectorUtils { } /** + * This class can be used to wrap Hive objects and put in HashMap or HashSet. + * The objects will be compared using ObjectInspectors. + * + */ + public static class ObjectInspectorObject { + private final Object[] objects; + private final ObjectInspector[] oi; + + public ObjectInspectorObject(Object object, ObjectInspector oi) { + this.objects = new Object[] { object }; + this.oi = new ObjectInspector[] { oi }; + } + + public ObjectInspectorObject(Object[] objects, ObjectInspector[] oi) { + this.objects = objects; + this.oi = oi; + } + + public Object[] getValues() { + return objects; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || obj.getClass() != this.getClass()) { return false; } + + ObjectInspectorObject comparedObject = (ObjectInspectorObject)obj; + return ObjectInspectorUtils.compare(objects, oi, comparedObject.objects, comparedObject.oi) == 0; + } + + @Override + public int hashCode() { + return ObjectInspectorUtils.getBucketHashCode(objects, oi); + } + } + + /** * Calculates the hash code for array of Objects that contains writables. This is used * to work around the buggy Hadoop DoubleWritable hashCode implementation. This should * only be used for process-local hash codes; don't replace stored hash codes like bucketing.