Repository: hive
Updated Branches:
  refs/heads/master c98ac7ec7 -> 154c662da


HIVE-10643 : Refactoring Windowing for sum() to pass WindowFrameDef instead of 
two numbers (1 for number of preceding and 1 for number of following) (Aihua Xu 
via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/154c662d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/154c662d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/154c662d

Branch: refs/heads/master
Commit: 154c662dab792bd7a8b617b84b6d5c62237788be
Parents: c98ac7e
Author: Aihua Xu <(aihu...@gmail.com)>
Authored: Mon May 11 13:03:00 2015 -0700
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Tue May 12 09:06:04 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/parse/PTFTranslator.java     |  5 +-
 .../hadoop/hive/ql/plan/ptf/BoundaryDef.java    |  5 ++
 .../hadoop/hive/ql/plan/ptf/WindowFrameDef.java | 20 +++--
 .../hive/ql/udf/generic/GenericUDAFAverage.java | 43 ++++-------
 .../ql/udf/generic/GenericUDAFFirstValue.java   | 20 ++---
 .../ql/udf/generic/GenericUDAFLastValue.java    | 18 ++---
 .../hive/ql/udf/generic/GenericUDAFMax.java     | 20 ++---
 .../generic/GenericUDAFStreamingEvaluator.java  | 77 +++++++++++++++-----
 .../hive/ql/udf/generic/GenericUDAFSum.java     | 41 +++--------
 .../hadoop/hive/ql/udaf/TestStreamingSum.java   |  6 +-
 10 files changed, 136 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
index 00b43c6..d7f1c7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
@@ -538,10 +538,7 @@ public class PTFTranslator {
           "Window range invalid, start boundary is greater than end boundary: 
%s", spec));
     }
 
-    WindowFrameDef wfDef = new WindowFrameDef();
-    wfDef.setStart(translate(inpShape, s));
-    wfDef.setEnd(translate(inpShape, e));
-    return wfDef;
+    return new WindowFrameDef(translate(inpShape, s), translate(inpShape, e));
   }
 
   private BoundaryDef translate(ShapeDetails inpShape, BoundarySpec bndSpec)

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
index f692fa2..eeb094c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan.ptf;
 
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
 
 public abstract class BoundaryDef {
@@ -33,6 +34,10 @@ public abstract class BoundaryDef {
 
   public abstract int getAmt();
 
+  public boolean isUnbounded() {
+    return this.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT;
+  }
+
   @Override
   public String toString() {
     return direction == null ? "" :

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
index e08bdd5..d153b08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
@@ -23,20 +23,28 @@ public class WindowFrameDef {
   private BoundaryDef start;
   private BoundaryDef end;
 
+  public WindowFrameDef(BoundaryDef start, BoundaryDef end) {
+    this.start = start;
+    this.end = end;
+  }
   public BoundaryDef getStart() {
     return start;
   }
 
-  public void setStart(BoundaryDef start) {
-    this.start = start;
-  }
-
   public BoundaryDef getEnd() {
     return end;
   }
 
-  public void setEnd(BoundaryDef end) {
-    this.end = end;
+  public boolean isStartUnbounded() {
+    return start.isUnbounded();
+  }
+
+  public boolean isEndUnbounded() {
+    return end.isUnbounded();
+  }
+
+  public int getWindowSize() {
+    return end.getAmt() + start.getAmt() + 1;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
index 12a327f..9f78449 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -157,13 +157,9 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
 
-      BoundaryDef start = wFrmDef.getStart();
-      BoundaryDef end = wFrmDef.getEnd();
-
-      return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, 
Object[]>(this,
-          start.getAmt(), end.getAmt()) {
+      return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, 
Object[]>(this, wFrameDef) {
 
         @Override
         protected DoubleWritable getNextResult(
@@ -172,14 +168,12 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
           AverageAggregationBuffer<Double> myagg = 
(AverageAggregationBuffer<Double>) ss.wrappedBuf;
           Double r = myagg.count == 0 ? null : myagg.sum;
           long cnt = myagg.count;
-          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
-            Object[] o = ss.intermediateVals.remove(0);
-            if (o != null) {
-              Double d = (Double) o[0];
-              r = r == null ? null : r - d;
-              cnt = cnt - ((Long) o[1]);
-            }
+
+          Object[] o = ss.retrieveNextIntermediateValue();
+          if (o != null) {
+            Double d = (Double) o[0];
+            r = r == null ? null : r - d;
+            cnt = cnt - ((Long) o[1]);
           }
 
           return r == null ? null : new DoubleWritable(r / cnt);
@@ -287,13 +281,10 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
-
-      BoundaryDef start = wFrmDef.getStart();
-      BoundaryDef end = wFrmDef.getEnd();
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
 
       return new 
GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>(
-          this, start.getAmt(), end.getAmt()) {
+          this, wFrameDef) {
 
         @Override
         protected HiveDecimalWritable getNextResult(
@@ -302,14 +293,12 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
           AverageAggregationBuffer<HiveDecimal> myagg = 
(AverageAggregationBuffer<HiveDecimal>) ss.wrappedBuf;
           HiveDecimal r = myagg.count == 0 ? null : myagg.sum;
           long cnt = myagg.count;
-          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
-            Object[] o = ss.intermediateVals.remove(0);
-            if (o != null) {
-              HiveDecimal d = (HiveDecimal) o[0];
-              r = r == null ? null : r.subtract(d);
-              cnt = cnt - ((Long) o[1]);
-            }
+
+          Object[] o = ss.retrieveNextIntermediateValue();
+          if (o != null) {
+            HiveDecimal d = (HiveDecimal) o[0];
+            r = r == null ? null : r.subtract(d);
+            cnt = cnt - ((Long) o[1]);
           }
 
           return r == null ? null : new HiveDecimalWritable(

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
index f679387..dd9eaf3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
@@ -178,8 +178,8 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
 
       private final Deque<ValIndexPair> valueChain;
 
-      public State(int numPreceding, int numFollowing, AggregationBuffer buf) {
-        super(numPreceding, numFollowing, buf);
+      public State(AggregationBuffer buf) {
+        super(buf);
         valueChain = new ArrayDeque<ValIndexPair>(numPreceding + numFollowing 
+ 1);
       }
 
@@ -225,7 +225,7 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
     @Override
     public AggregationBuffer getNewAggregationBuffer() throws HiveException {
       AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
-      return new State(numPreceding, numFollowing, underlying);
+      return new State(underlying);
     }
 
     protected ObjectInspector inputOI() {
@@ -252,7 +252,7 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
        * add row to chain. except in case of UNB preceding: - only 1 firstVal
        * needs to be tracked.
        */
-      if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || 
s.valueChain.isEmpty()) {
+      if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || 
s.valueChain.isEmpty()) {
         /*
          * add value to chain if it is not null or if skipNulls is false.
          */
@@ -261,7 +261,7 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
         }
       }
 
-      if (s.numRows >= (s.numFollowing)) {
+      if (s.numRows >= numFollowing) {
         /*
          * if skipNulls is true and there are no rows in valueChain => all rows
          * in partition are null so far; so add null in o/p
@@ -276,8 +276,8 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
 
       if (s.valueChain.size() > 0) {
         int fIdx = (Integer) s.valueChain.getFirst().idx;
-        if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-            && s.numRows > fIdx + s.numPreceding + s.numFollowing) {
+        if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+            && s.numRows > fIdx + numPreceding + numFollowing) {
           s.valueChain.removeFirst();
         }
       }
@@ -288,13 +288,13 @@ public class GenericUDAFFirstValue extends 
AbstractGenericUDAFResolver {
       State s = (State) agg;
       ValIndexPair r = s.valueChain.size() == 0 ? null : 
s.valueChain.getFirst();
 
-      for (int i = 0; i < s.numFollowing; i++) {
+      for (int i = 0; i < numFollowing; i++) {
         s.results.add(r == null ? null : r.val);
         s.numRows++;
         if (r != null) {
           int fIdx = (Integer) r.idx;
-          if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-              && s.numRows > fIdx + s.numPreceding + s.numFollowing
+          if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+              && s.numRows > fIdx + numPreceding + numFollowing
               && !s.valueChain.isEmpty()) {
             s.valueChain.removeFirst();
             r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r;

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
index e099154..3ed6de7 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
@@ -154,8 +154,8 @@ public class GenericUDAFLastValue extends 
AbstractGenericUDAFResolver {
       private Object lastValue;
       private int lastIdx;
 
-      public State(int numPreceding, int numFollowing, AggregationBuffer buf) {
-        super(numPreceding, numFollowing, buf);
+      public State(AggregationBuffer buf) {
+        super(buf);
         lastValue = null;
         lastIdx = -1;
       }
@@ -192,7 +192,7 @@ public class GenericUDAFLastValue extends 
AbstractGenericUDAFResolver {
     @Override
     public AggregationBuffer getNewAggregationBuffer() throws HiveException {
       AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
-      return new State(numPreceding, numFollowing, underlying);
+      return new State(underlying);
     }
 
     protected ObjectInspector inputOI() {
@@ -219,14 +219,14 @@ public class GenericUDAFLastValue extends 
AbstractGenericUDAFResolver {
         s.lastValue = o;
         s.lastIdx = s.numRows;
       } else if (lb.skipNulls && s.lastIdx != -1) {
-        if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-            && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) {
+        if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+            && s.numRows > s.lastIdx + numPreceding + numFollowing) {
           s.lastValue = null;
           s.lastIdx = -1;
         }
       }
 
-      if (s.numRows >= (s.numFollowing)) {
+      if (s.numRows >= (numFollowing)) {
         s.results.add(s.lastValue);
       }
       s.numRows++;
@@ -238,14 +238,14 @@ public class GenericUDAFLastValue extends 
AbstractGenericUDAFResolver {
       LastValueBuffer lb = (LastValueBuffer) s.wrappedBuf;
 
       if (lb.skipNulls && s.lastIdx != -1) {
-        if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-            && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) {
+        if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+            && s.numRows > s.lastIdx + numPreceding + numFollowing) {
           s.lastValue = null;
           s.lastIdx = -1;
         }
       }
 
-      for (int i = 0; i < s.numFollowing; i++) {
+      for (int i = 0; i < numFollowing; i++) {
         s.results.add(s.lastValue);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
index a153818..6b7808a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
@@ -166,8 +166,8 @@ public class GenericUDAFMax extends 
AbstractGenericUDAFResolver {
     class State extends GenericUDAFStreamingEvaluator<Object>.StreamingState {
       private final Deque<Object[]> maxChain;
 
-      public State(int numPreceding, int numFollowing, AggregationBuffer buf) {
-        super(numPreceding, numFollowing, buf);
+      public State(AggregationBuffer buf) {
+        super(buf);
         maxChain = new ArrayDeque<Object[]>(numPreceding + numFollowing + 1);
       }
 
@@ -209,7 +209,7 @@ public class GenericUDAFMax extends 
AbstractGenericUDAFResolver {
     @Override
     public AggregationBuffer getNewAggregationBuffer() throws HiveException {
       AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
-      return new State(numPreceding, numFollowing, underlying);
+      return new State(underlying);
     }
 
     protected ObjectInspector inputOI() {
@@ -240,21 +240,21 @@ public class GenericUDAFMax extends 
AbstractGenericUDAFResolver {
        * to be tracked. - current max will never become out of range. It can
        * only be replaced by a larger max.
        */
-      if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+      if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
           || s.maxChain.isEmpty()) {
         o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o,
             inputOI(), ObjectInspectorCopyOption.JAVA);
         s.maxChain.addLast(new Object[] { o, s.numRows });
       }
 
-      if (s.numRows >= (s.numFollowing)) {
+      if (s.numRows >= numFollowing) {
         s.results.add(s.maxChain.getFirst()[0]);
       }
       s.numRows++;
 
       int fIdx = (Integer) s.maxChain.getFirst()[1];
-      if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-          && s.numRows > fIdx + s.numPreceding + s.numFollowing) {
+      if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+          && s.numRows > fIdx + numPreceding + numFollowing) {
         s.maxChain.removeFirst();
       }
     }
@@ -279,12 +279,12 @@ public class GenericUDAFMax extends 
AbstractGenericUDAFResolver {
       State s = (State) agg;
       Object[] r = s.maxChain.getFirst();
 
-      for (int i = 0; i < s.numFollowing; i++) {
+      for (int i = 0; i < numFollowing; i++) {
         s.results.add(r[0]);
         s.numRows++;
         int fIdx = (Integer) r[1];
-        if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-            && s.numRows - s.numFollowing + i > fIdx + s.numPreceding
+        if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+            && s.numRows - numFollowing + i > fIdx + numPreceding
             && !s.maxChain.isEmpty()) {
           s.maxChain.removeFirst();
           r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r;

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
index d68c085..578c356 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
 import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -32,28 +32,44 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
     GenericUDAFEvaluator implements ISupportStreamingModeForWindowing {
 
   protected final GenericUDAFEvaluator wrappedEval;
+  protected final WindowFrameDef wFrameDef;
+
   protected final int numPreceding;
   protected final int numFollowing;
 
+  /**
+   * @param wrappedEval
+   * @param numPreceding
+   * @param numFollowing
+   * @deprecated
+   */
   public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval,
       int numPreceding, int numFollowing) {
     this.wrappedEval = wrappedEval;
+    this.wFrameDef = null;
+    this.mode = wrappedEval.mode;
+
     this.numPreceding = numPreceding;
     this.numFollowing = numFollowing;
+  }
+
+  public GenericUDAFStreamingEvaluator(GenericUDAFEvaluator wrappedEval,
+      WindowFrameDef wFrameDef) {
+    this.wrappedEval = wrappedEval;
+    this.wFrameDef = wFrameDef;
     this.mode = wrappedEval.mode;
+
+    this.numPreceding = -1;
+    this.numFollowing = -1;
   }
 
   class StreamingState extends AbstractAggregationBuffer {
     final AggregationBuffer wrappedBuf;
-    final int numPreceding;
-    final int numFollowing;
     final List<T1> results;
-    int numRows;
+    int numRows;  // Number of rows processed in the partition.
 
-    StreamingState(int numPreceding, int numFollowing, AggregationBuffer buf) {
+    StreamingState(AggregationBuffer buf) {
       this.wrappedBuf = buf;
-      this.numPreceding = numPreceding;
-      this.numFollowing = numFollowing;
       results = new ArrayList<T1>();
       numRows = 0;
     }
@@ -105,18 +121,16 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
   public static abstract class SumAvgEnhancer<T1, T2> extends
       GenericUDAFStreamingEvaluator<T1> {
 
-    public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, int numPreceding,
-        int numFollowing) {
-      super(wrappedEval, numPreceding, numFollowing);
+    public SumAvgEnhancer(GenericUDAFEvaluator wrappedEval, WindowFrameDef 
wFrameDef) {
+      super(wrappedEval, wFrameDef);
     }
 
     class SumAvgStreamingState extends StreamingState {
 
       final List<T2> intermediateVals;
 
-      SumAvgStreamingState(int numPreceding, int numFollowing,
-          AggregationBuffer buf) {
-        super(numPreceding, numFollowing, buf);
+      SumAvgStreamingState(AggregationBuffer buf) {
+        super(buf);
         intermediateVals = new ArrayList<T2>();
       }
 
@@ -129,7 +143,7 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
         if (underlying == -1) {
           return -1;
         }
-        if (numPreceding == BoundarySpec.UNBOUNDED_AMOUNT) {
+        if (wFrameDef.isStartUnbounded()) {
           return -1;
         }
         /*
@@ -138,7 +152,7 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
          * of underlying * wdwSz sz of intermediates = sz of underlying * wdwSz
          */
 
-        int wdwSz = numPreceding + numFollowing + 1;
+        int wdwSz = wFrameDef.getWindowSize();
         return underlying + (underlying * wdwSz) + (underlying * wdwSz)
             + (3 * JavaDataModel.PRIMITIVES1);
       }
@@ -147,12 +161,33 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
         intermediateVals.clear();
         super.reset();
       }
+
+      /**
+       * After the number of rows processed is more than the size of FOLLOWING 
window,
+       * we can generate a PTF result for a previous row when a new row gets 
processed.
+       * @return
+       */
+      public boolean hasResultReady() {
+        return this.numRows >= wFrameDef.getEnd().getAmt();
+      }
+
+      /**
+       * Retrieve the next stored intermediate result to generate the result 
for next available row
+       */
+      public T2 retrieveNextIntermediateValue() {
+        if (!wFrameDef.getStart().isUnbounded()
+            && (this.numRows - wFrameDef.getEnd().getAmt()) >= 
(wFrameDef.getStart().getAmt() + 1)) {
+          return this.intermediateVals.remove(0);
+        }
+
+        return null;
+      }
     }
 
     @Override
     public AggregationBuffer getNewAggregationBuffer() throws HiveException {
       AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
-      return new SumAvgStreamingState(numPreceding, numFollowing, underlying);
+      return new SumAvgStreamingState(underlying);
     }
 
     @Override
@@ -161,11 +196,11 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
       SumAvgStreamingState ss = (SumAvgStreamingState) agg;
 
       wrappedEval.iterate(ss.wrappedBuf, parameters);
-
-      if (ss.numRows >= ss.numFollowing) {
+      // Generate the result for a previous row, of whose window all the rows 
have been processed.
+      if (ss.hasResultReady()) {
         ss.results.add(getNextResult(ss));
       }
-      if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT) {
+      if (!wFrameDef.isStartUnbounded()) {
         ss.intermediateVals.add(getCurrentIntermediateResult(ss));
       }
 
@@ -177,10 +212,12 @@ public abstract class GenericUDAFStreamingEvaluator<T1> 
extends
       SumAvgStreamingState ss = (SumAvgStreamingState) agg;
       Object o = wrappedEval.terminate(ss.wrappedBuf);
 
-      for (int i = 0; i < ss.numFollowing; i++) {
+      // After all the rows are processed, continue to generate results for 
the rows that results haven't generate
+      for (int i = 0; i < wFrameDef.getEnd().getAmt(); i++) {
         ss.results.add(getNextResult(ss));
         ss.numRows++;
       }
+
       return o;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
index ffb7093..5a5846e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -205,13 +205,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
-
-      BoundaryDef start = wFrmDef.getStart();
-      BoundaryDef end = wFrmDef.getEnd();
-
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
       return new 
GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>(
-          this, start.getAmt(), end.getAmt()) {
+          this, wFrameDef) {
 
         @Override
         protected HiveDecimalWritable getNextResult(
@@ -219,10 +215,8 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
             throws HiveException {
           SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) ss.wrappedBuf;
           HiveDecimal r = myagg.empty ? null : myagg.sum;
-          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
-            HiveDecimal d = (HiveDecimal) ss.intermediateVals.remove(0);
-            d = d == null ? HiveDecimal.ZERO : d;
+          HiveDecimal d = ss.retrieveNextIntermediateValue();
+          if (d != null ) {
             r = r == null ? null : r.subtract(d);
           }
 
@@ -325,12 +319,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
-      BoundaryDef start = wFrmDef.getStart();
-      BoundaryDef end = wFrmDef.getEnd();
-
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
       return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, 
Double>(this,
-          start.getAmt(), end.getAmt()) {
+          wFrameDef) {
 
         @Override
         protected DoubleWritable getNextResult(
@@ -338,10 +329,8 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
             throws HiveException {
           SumDoubleAgg myagg = (SumDoubleAgg) ss.wrappedBuf;
           Double r = myagg.empty ? null : myagg.sum;
-          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
-            Double d = (Double) ss.intermediateVals.remove(0);
-            d = d == null ? 0.0 : d;
+          Double d = ss.retrieveNextIntermediateValue();
+          if (d != null) {
             r = r == null ? null : r - d;
           }
 
@@ -442,13 +431,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
-
-      BoundaryDef start = wFrmDef.getStart();
-      BoundaryDef end = wFrmDef.getEnd();
-
+    public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
       return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, 
Long>(this,
-          start.getAmt(), end.getAmt()) {
+          wFrameDef) {
 
         @Override
         protected LongWritable getNextResult(
@@ -456,10 +441,8 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
             throws HiveException {
           SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
           Long r = myagg.empty ? null : myagg.sum;
-          if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
-              && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
-            Long d = (Long) ss.intermediateVals.remove(0);
-            d = d == null ? 0 : d;
+          Long d = ss.retrieveNextIntermediateValue();
+          if (d != null) {
             r = r == null ? null : r - d;
           }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/154c662d/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java 
b/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
index a331e66..88cafc0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udaf/TestStreamingSum.java
@@ -50,7 +50,6 @@ import org.junit.Test;
 public class TestStreamingSum {
 
   public static WindowFrameDef wdwFrame(int p, int f) {
-    WindowFrameDef wFrmDef = new WindowFrameDef();
     BoundaryDef start, end;
     if (p == 0) {
       start = new CurrentRowDef();
@@ -69,9 +68,8 @@ public class TestStreamingSum {
       endR.setAmt(f);
       end = endR;
     }
-    wFrmDef.setStart(start);
-    wFrmDef.setEnd(end);
-    return wFrmDef;
+
+    return new WindowFrameDef(start, end);
   }
 
   public void sumDouble(Iterator<Double> inVals, int inSz, int numPreceding,

Reply via email to