Repository: hive Updated Branches: refs/heads/master c98ac7ec7 -> 154c662da
HIVE-10643 : Refactoring Windowing for sum() to pass WindowFrameDef instead of two numbers (1 for number of preceding and 1 for number of following) (Aihua Xu via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/154c662d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/154c662d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/154c662d Branch: refs/heads/master Commit: 154c662dab792bd7a8b617b84b6d5c62237788be Parents: c98ac7e Author: Aihua Xu <(aihu...@gmail.com)> Authored: Mon May 11 13:03:00 2015 -0700 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Tue May 12 09:06:04 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/parse/PTFTranslator.java | 5 +- .../hadoop/hive/ql/plan/ptf/BoundaryDef.java | 5 ++ .../hadoop/hive/ql/plan/ptf/WindowFrameDef.java | 20 +++-- .../hive/ql/udf/generic/GenericUDAFAverage.java | 43 ++++------- .../ql/udf/generic/GenericUDAFFirstValue.java | 20 ++--- .../ql/udf/generic/GenericUDAFLastValue.java | 18 ++--- .../hive/ql/udf/generic/GenericUDAFMax.java | 20 ++--- .../generic/GenericUDAFStreamingEvaluator.java | 77 +++++++++++++++----- .../hive/ql/udf/generic/GenericUDAFSum.java | 41 +++-------- .../hadoop/hive/ql/udaf/TestStreamingSum.java | 6 +- 10 files changed, 136 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java index 00b43c6..d7f1c7f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java @@ -538,10 +538,7 @@ public class PTFTranslator { "Window range invalid, start boundary is greater than end boundary: %s", spec)); } - WindowFrameDef wfDef = new WindowFrameDef(); - wfDef.setStart(translate(inpShape, s)); - wfDef.setEnd(translate(inpShape, e)); - return wfDef; + return new WindowFrameDef(translate(inpShape, s), translate(inpShape, e)); } private BoundaryDef translate(ShapeDetails inpShape, BoundarySpec bndSpec) http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java index f692fa2..eeb094c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan.ptf; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; public abstract class BoundaryDef { @@ -33,6 +34,10 @@ public abstract class BoundaryDef { public abstract int getAmt(); + public boolean isUnbounded() { + return this.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT; + } + @Override public String toString() { return direction == null ? "" : http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java index e08bdd5..d153b08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java @@ -23,20 +23,28 @@ public class WindowFrameDef { private BoundaryDef start; private BoundaryDef end; + public WindowFrameDef(BoundaryDef start, BoundaryDef end) { + this.start = start; + this.end = end; + } public BoundaryDef getStart() { return start; } - public void setStart(BoundaryDef start) { - this.start = start; - } - public BoundaryDef getEnd() { return end; } - public void setEnd(BoundaryDef end) { - this.end = end; + public boolean isStartUnbounded() { + return start.isUnbounded(); + } + + public boolean isEndUnbounded() { + return end.isUnbounded(); + } + + public int getWindowSize() { + return end.getAmt() + start.getAmt() + 1; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java index 12a327f..9f78449 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java @@ -157,13 +157,9 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } @Override - public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - - return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>(this, - start.getAmt(), end.getAmt()) { + return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Object[]>(this, wFrameDef) { @Override protected DoubleWritable getNextResult( @@ -172,14 +168,12 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { AverageAggregationBuffer<Double> myagg = (AverageAggregationBuffer<Double>) ss.wrappedBuf; Double r = myagg.count == 0 ? null : myagg.sum; long cnt = myagg.count; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - Object[] o = ss.intermediateVals.remove(0); - if (o != null) { - Double d = (Double) o[0]; - r = r == null ? null : r - d; - cnt = cnt - ((Long) o[1]); - } + + Object[] o = ss.retrieveNextIntermediateValue(); + if (o != null) { + Double d = (Double) o[0]; + r = r == null ? null : r - d; + cnt = cnt - ((Long) o[1]); } return r == null ? null : new DoubleWritable(r / cnt); @@ -287,13 +281,10 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { } @Override - public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>( - this, start.getAmt(), end.getAmt()) { + this, wFrameDef) { @Override protected HiveDecimalWritable getNextResult( @@ -302,14 +293,12 @@ public class GenericUDAFAverage extends AbstractGenericUDAFResolver { AverageAggregationBuffer<HiveDecimal> myagg = (AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf; HiveDecimal r = myagg.count == 0 ? null : myagg.sum; long cnt = myagg.count; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - Object[] o = ss.intermediateVals.remove(0); - if (o != null) { - HiveDecimal d = (HiveDecimal) o[0]; - r = r == null ? null : r.subtract(d); - cnt = cnt - ((Long) o[1]); - } + + Object[] o = ss.retrieveNextIntermediateValue(); + if (o != null) { + HiveDecimal d = (HiveDecimal) o[0]; + r = r == null ? null : r.subtract(d); + cnt = cnt - ((Long) o[1]); } return r == null ? null : new HiveDecimalWritable( http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java index f679387..dd9eaf3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java @@ -178,8 +178,8 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { private final Deque<ValIndexPair> valueChain; - public State(int numPreceding, int numFollowing, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + public State(AggregationBuffer buf) { + super(buf); valueChain = new ArrayDeque<ValIndexPair>(numPreceding + numFollowing + 1); } @@ -225,7 +225,7 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new State(numPreceding, numFollowing, underlying); + return new State(underlying); } protected ObjectInspector inputOI() { @@ -252,7 +252,7 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { * add row to chain. except in case of UNB preceding: - only 1 firstVal * needs to be tracked. */ - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || s.valueChain.isEmpty()) { + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || s.valueChain.isEmpty()) { /* * add value to chain if it is not null or if skipNulls is false. */ @@ -261,7 +261,7 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { } } - if (s.numRows >= (s.numFollowing)) { + if (s.numRows >= numFollowing) { /* * if skipNulls is true and there are no rows in valueChain => all rows * in partition are null so far; so add null in o/p @@ -276,8 +276,8 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { if (s.valueChain.size() > 0) { int fIdx = (Integer) s.valueChain.getFirst().idx; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > fIdx + s.numPreceding + s.numFollowing) { + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + numPreceding + numFollowing) { s.valueChain.removeFirst(); } } @@ -288,13 +288,13 @@ public class GenericUDAFFirstValue extends AbstractGenericUDAFResolver { State s = (State) agg; ValIndexPair r = s.valueChain.size() == 0 ? null : s.valueChain.getFirst(); - for (int i = 0; i < s.numFollowing; i++) { + for (int i = 0; i < numFollowing; i++) { s.results.add(r == null ? null : r.val); s.numRows++; if (r != null) { int fIdx = (Integer) r.idx; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > fIdx + s.numPreceding + s.numFollowing + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + numPreceding + numFollowing && !s.valueChain.isEmpty()) { s.valueChain.removeFirst(); r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r; http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java index e099154..3ed6de7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java @@ -154,8 +154,8 @@ public class GenericUDAFLastValue extends AbstractGenericUDAFResolver { private Object lastValue; private int lastIdx; - public State(int numPreceding, int numFollowing, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + public State(AggregationBuffer buf) { + super(buf); lastValue = null; lastIdx = -1; } @@ -192,7 +192,7 @@ public class GenericUDAFLastValue extends AbstractGenericUDAFResolver { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new State(numPreceding, numFollowing, underlying); + return new State(underlying); } protected ObjectInspector inputOI() { @@ -219,14 +219,14 @@ public class GenericUDAFLastValue extends AbstractGenericUDAFResolver { s.lastValue = o; s.lastIdx = s.numRows; } else if (lb.skipNulls && s.lastIdx != -1) { - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) { + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > s.lastIdx + numPreceding + numFollowing) { s.lastValue = null; s.lastIdx = -1; } } - if (s.numRows >= (s.numFollowing)) { + if (s.numRows >= (numFollowing)) { s.results.add(s.lastValue); } s.numRows++; @@ -238,14 +238,14 @@ public class GenericUDAFLastValue extends AbstractGenericUDAFResolver { LastValueBuffer lb = (LastValueBuffer) s.wrappedBuf; if (lb.skipNulls && s.lastIdx != -1) { - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) { + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > s.lastIdx + numPreceding + numFollowing) { s.lastValue = null; s.lastIdx = -1; } } - for (int i = 0; i < s.numFollowing; i++) { + for (int i = 0; i < numFollowing; i++) { s.results.add(s.lastValue); } http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java index a153818..6b7808a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java @@ -166,8 +166,8 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver { class State extends GenericUDAFStreamingEvaluator<Object>.StreamingState { private final Deque<Object[]> maxChain; - public State(int numPreceding, int numFollowing, AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + public State(AggregationBuffer buf) { + super(buf); maxChain = new ArrayDeque<Object[]>(numPreceding + numFollowing + 1); } @@ -209,7 +209,7 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver { @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new State(numPreceding, numFollowing, underlying); + return new State(underlying); } protected ObjectInspector inputOI() { @@ -240,21 +240,21 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver { * to be tracked. - current max will never become out of range. It can * only be replaced by a larger max. */ - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || s.maxChain.isEmpty()) { o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o, inputOI(), ObjectInspectorCopyOption.JAVA); s.maxChain.addLast(new Object[] { o, s.numRows }); } - if (s.numRows >= (s.numFollowing)) { + if (s.numRows >= numFollowing) { s.results.add(s.maxChain.getFirst()[0]); } s.numRows++; int fIdx = (Integer) s.maxChain.getFirst()[1]; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows > fIdx + s.numPreceding + s.numFollowing) { + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows > fIdx + numPreceding + numFollowing) { s.maxChain.removeFirst(); } } @@ -279,12 +279,12 @@ public class GenericUDAFMax extends AbstractGenericUDAFResolver { State s = (State) agg; Object[] r = s.maxChain.getFirst(); - for (int i = 0; i < s.numFollowing; i++) { + for (int i = 0; i < numFollowing; i++) { s.results.add(r[0]); s.numRows++; int fIdx = (Integer) r[1]; - if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && s.numRows - s.numFollowing + i > fIdx + s.numPreceding + if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT + && s.numRows - numFollowing + i > fIdx + numPreceding && !s.maxChain.isEmpty()) { s.maxChain.removeFirst(); r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r; http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java index d68c085..578c356 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -32,28 +32,44 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends GenericUDAFEvaluator implements ISupportStreamingModeForWindowing { protected final GenericUDAFEvaluator wrappedEval; + protected final WindowFrameDef wFrameDef; + protected final int numPreceding; protected final int numFollowing; + /** + * @param wrappedEval + * @param numPreceding + * @param numFollowing + * @deprecated + */ public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, int numPreceding, int numFollowing) { this.wrappedEval = wrappedEval; + this.wFrameDef = null; + this.mode = wrappedEval.mode; + this.numPreceding = numPreceding; this.numFollowing = numFollowing; + } + + public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval, + WindowFrameDef wFrameDef) { + this.wrappedEval = wrappedEval; + this.wFrameDef = wFrameDef; this.mode = wrappedEval.mode; + + this.numPreceding = -1; + this.numFollowing = -1; } class StreamingState extends AbstractAggregationBuffer { final AggregationBuffer wrappedBuf; - final int numPreceding; - final int numFollowing; final List<T1> results; - int numRows; + int numRows; // Number of rows processed in the partition. - StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) { + StreamingState(AggregationBuffer buf) { this.wrappedBuf = buf; - this.numPreceding = numPreceding; - this.numFollowing = numFollowing; results = new ArrayList<T1>(); numRows = 0; } @@ -105,18 +121,16 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends public static abstract class SumAvgEnhancer<T1, T2> extends GenericUDAFStreamingEvaluator<T1> { - public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, int numPreceding, - int numFollowing) { - super(wrappedEval, numPreceding, numFollowing); + public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, WindowFrameDef wFrameDef) { + super(wrappedEval, wFrameDef); } class SumAvgStreamingState extends StreamingState { final List<T2> intermediateVals; - SumAvgStreamingState(int numPreceding, int numFollowing, - AggregationBuffer buf) { - super(numPreceding, numFollowing, buf); + SumAvgStreamingState(AggregationBuffer buf) { + super(buf); intermediateVals = new ArrayList<T2>(); } @@ -129,7 +143,7 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends if (underlying == -1) { return -1; } - if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) { + if (wFrameDef.isStartUnbounded()) { return -1; } /* @@ -138,7 +152,7 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends * of underlying * wdwSz sz of intermediates = sz of underlying * wdwSz */ - int wdwSz = numPreceding + numFollowing + 1; + int wdwSz = wFrameDef.getWindowSize(); return underlying + (underlying * wdwSz) + (underlying * wdwSz) + (3 * JavaDataModel.PRIMITIVES1); } @@ -147,12 +161,33 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends intermediateVals.clear(); super.reset(); } + + /** + * After the number of rows processed is more than the size of FOLLOWING window, + * we can generate a PTF result for a previous row when a new row gets processed. + * @return + */ + public boolean hasResultReady() { + return this.numRows >= wFrameDef.getEnd().getAmt(); + } + + /** + * Retrieve the next stored intermediate result to generate the result for next available row + */ + public T2 retrieveNextIntermediateValue() { + if (!wFrameDef.getStart().isUnbounded() + && (this.numRows - wFrameDef.getEnd().getAmt()) >= (wFrameDef.getStart().getAmt() + 1)) { + return this.intermediateVals.remove(0); + } + + return null; + } } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer(); - return new SumAvgStreamingState(numPreceding, numFollowing, underlying); + return new SumAvgStreamingState(underlying); } @Override @@ -161,11 +196,11 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends SumAvgStreamingState ss = (SumAvgStreamingState) agg; wrappedEval.iterate(ss.wrappedBuf, parameters); - - if (ss.numRows >= ss.numFollowing) { + // Generate the result for a previous row, of whose window all the rows have been processed. + if (ss.hasResultReady()) { ss.results.add(getNextResult(ss)); } - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) { + if (!wFrameDef.isStartUnbounded()) { ss.intermediateVals.add(getCurrentIntermediateResult(ss)); } @@ -177,10 +212,12 @@ public abstract class GenericUDAFStreamingEvaluator<T1> extends SumAvgStreamingState ss = (SumAvgStreamingState) agg; Object o = wrappedEval.terminate(ss.wrappedBuf); - for (int i = 0; i < ss.numFollowing; i++) { + // After all the rows are processed, continue to generate results for the rows that results haven't generate + for (int i = 0; i < wFrameDef.getEnd().getAmt(); i++) { ss.results.add(getNextResult(ss)); ss.numRows++; } + return o; } http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java index ffb7093..5a5846e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java @@ -205,13 +205,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } @Override - public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>( - this, start.getAmt(), end.getAmt()) { + this, wFrameDef) { @Override protected HiveDecimalWritable getNextResult( @@ -219,10 +215,8 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { throws HiveException { SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf; HiveDecimal r = myagg.empty ? null : myagg.sum; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - HiveDecimal d = (HiveDecimal) ss.intermediateVals.remove(0); - d = d == null ? HiveDecimal.ZERO : d; + HiveDecimal d = ss.retrieveNextIntermediateValue(); + if (d != null ) { r = r == null ? null : r.subtract(d); } @@ -325,12 +319,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } @Override - public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, Double>(this, - start.getAmt(), end.getAmt()) { + wFrameDef) { @Override protected DoubleWritable getNextResult( @@ -338,10 +329,8 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { throws HiveException { SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf; Double r = myagg.empty ? null : myagg.sum; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - Double d = (Double) ss.intermediateVals.remove(0); - d = d == null ? 0.0 : d; + Double d = ss.retrieveNextIntermediateValue(); + if (d != null) { r = r == null ? null : r - d; } @@ -442,13 +431,9 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { } @Override - public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) { - - BoundaryDef start = wFrmDef.getStart(); - BoundaryDef end = wFrmDef.getEnd(); - + public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) { return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, Long>(this, - start.getAmt(), end.getAmt()) { + wFrameDef) { @Override protected LongWritable getNextResult( @@ -456,10 +441,8 @@ public class GenericUDAFSum extends AbstractGenericUDAFResolver { throws HiveException { SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf; Long r = myagg.empty ? null : myagg.sum; - if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT - && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) { - Long d = (Long) ss.intermediateVals.remove(0); - d = d == null ? 0 : d; + Long d = ss.retrieveNextIntermediateValue(); + if (d != null) { r = r == null ? null : r - d; } http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java b/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java index a331e66..88cafc0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java @@ -50,7 +50,6 @@ import org.junit.Test; public class TestStreamingSum { public static WindowFrameDef wdwFrame(int p, int f) { - WindowFrameDef wFrmDef = new WindowFrameDef(); BoundaryDef start, end; if (p == 0) { start = new CurrentRowDef(); @@ -69,9 +68,8 @@ public class TestStreamingSum { endR.setAmt(f); end = endR; } - wFrmDef.setStart(start); - wFrmDef.setEnd(end); - return wFrmDef; + + return new WindowFrameDef(start, end); } public void sumDouble(Iterator<Double> inVals, int inSz, int numPreceding,