Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3247#discussion_r27717124
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 ---
    @@ -17,285 +17,159 @@
     
     package org.apache.spark.sql.catalyst.expressions
     
    -import com.clearspring.analytics.stream.cardinality.HyperLogLog
    -
     import org.apache.spark.sql.types._
    -import org.apache.spark.sql.catalyst.trees
    -import org.apache.spark.sql.catalyst.errors.TreeNodeException
    -import org.apache.spark.util.collection.OpenHashSet
    -
    -abstract class AggregateExpression extends Expression {
    -  self: Product =>
     
    -  /**
    -   * Creates a new instance that can be used to compute this aggregate 
expression for a group
    -   * of input rows/
    -   */
    -  def newInstance(): AggregateFunction
     
    -  /**
    -   * [[AggregateExpression.eval]] should never be invoked because 
[[AggregateExpression]]'s are
    -   * replaced with a physical aggregate operator at runtime.
    -   */
    -  override def eval(input: Row = null): EvaluatedType =
    -    throw new TreeNodeException(this, s"No function to evaluate 
expression. type: ${this.nodeName}")
    -}
    +/**
    + * This is from 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode
    + * Just a hint for the UDAF developers which stage we are about to process,
    + * However, we probably don't want the developers knows so many details, 
here
    + * is just for keep consistent with Hive (when integrated with Hive), need 
to
    + * figure out if we have work around for that soon.
    + */
    +@deprecated
    +trait Mode
     
     /**
    - * Represents an aggregation that has been rewritten to be performed in 
two steps.
    - *
    - * @param finalEvaluation an aggregate expression that evaluates to same 
final result as the
    - *                        original aggregation.
    - * @param partialEvaluations A sequence of [[NamedExpression]]s that can 
be computed on partial
    - *                           data sets and are required to compute the 
`finalEvaluation`.
    + * PARTIAL1: from original data to partial aggregation data: iterate() and
    + * terminatePartial() will be called.
      */
    -case class SplitEvaluation(
    -    finalEvaluation: Expression,
    -    partialEvaluations: Seq[NamedExpression])
    +@deprecated
    +case object PARTIAL1 extends Mode
     
     /**
    - * An [[AggregateExpression]] that can be partially computed without 
seeing all relevant tuples.
    - * These partial evaluations can then be combined to compute the actual 
answer.
    + * PARTIAL2: from partial aggregation data to partial aggregation data:
    + * merge() and terminatePartial() will be called.
      */
    -abstract class PartialAggregate extends AggregateExpression {
    -  self: Product =>
    +@deprecated
    +case object PARTIAL2 extends Mode
    +/**
    + * FINAL: from partial aggregation to full aggregation: merge() and
    + * terminate() will be called.
    + */
    +@deprecated
    +case object FINAL extends Mode
    +/**
    + * COMPLETE: from original data directly to full aggregation: iterate() and
    + * terminate() will be called.
    + */
    +@deprecated
    +case object COMPLETE extends Mode
     
    -  /**
    -   * Returns a [[SplitEvaluation]] that computes this aggregation using 
partial aggregation.
    -   */
    -  def asPartial: SplitEvaluation
    -}
     
     /**
    - * A specific implementation of an aggregate function. Used to wrap a 
generic
    - * [[AggregateExpression]] with an algorithm that will be used to compute 
one specific result.
    + * Aggregation Function Interface
    + * All of the function will be called within Spark executors.
      */
    -abstract class AggregateFunction
    -  extends AggregateExpression with Serializable with 
trees.LeafNode[Expression] {
    +trait AggregateFunction {
       self: Product =>
     
    -  override type EvaluatedType = Any
    -
    -  /** Base should return the generic aggregate expression that this 
function is computing */
    -  val base: AggregateExpression
    -
    -  override def nullable: Boolean = base.nullable
    -  override def dataType: DataType = base.dataType
    +  // Specify the BoundReference for Aggregate Buffer
    +  def initialBoundReference(buffers: Seq[BoundReference]): Unit
     
    -  def update(input: Row): Unit
    +  // Initialize (reinitialize) the aggregation buffer
    +  def reset(buf: MutableRow): Unit
     
    -  // Do we really need this?
    -  override def newInstance(): AggregateFunction = {
    -    makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
    -  }
    -}
    +  // Expect the aggregate function fills the aggregation buffer when
    +  // fed with each value in the group
    +  def iterate(arguments: Any, buf: MutableRow): Unit
     
    -case class Min(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
    +  // Merge 2 aggregation buffer, and write back to the later one
    +  def merge(value: Row, buf: MutableRow): Unit
     
    -  override def nullable: Boolean = true
    -  override def dataType: DataType = child.dataType
    -  override def toString: String = s"MIN($child)"
    -
    -  override def asPartial: SplitEvaluation = {
    -    val partialMin = Alias(Min(child), "PartialMin")()
    -    SplitEvaluation(Min(partialMin.toAttribute), partialMin :: Nil)
    -  }
    -
    -  override def newInstance(): MinFunction = new MinFunction(child, this)
    -}
    -
    -case class MinFunction(expr: Expression, base: AggregateExpression) 
extends AggregateFunction {
    -  def this() = this(null, null) // Required for serialization.
    -
    -  val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType)
    -  val cmp = GreaterThan(currentMin, expr)
    -
    -  override def update(input: Row): Unit = {
    -    if (currentMin.value == null) {
    -      currentMin.value = expr.eval(input)
    -    } else if(cmp.eval(input) == true) {
    -      currentMin.value = expr.eval(input)
    -    }
    -  }
    -
    -  override def eval(input: Row): Any = currentMin.value
    -}
    -
    -case class Max(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
    -
    -  override def nullable: Boolean = true
    -  override def dataType: DataType = child.dataType
    -  override def toString: String = s"MAX($child)"
    -
    -  override def asPartial: SplitEvaluation = {
    -    val partialMax = Alias(Max(child), "PartialMax")()
    -    SplitEvaluation(Max(partialMax.toAttribute), partialMax :: Nil)
    -  }
    -
    -  override def newInstance(): MaxFunction = new MaxFunction(child, this)
    -}
    -
    -case class MaxFunction(expr: Expression, base: AggregateExpression) 
extends AggregateFunction {
    -  def this() = this(null, null) // Required for serialization.
    -
    -  val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType)
    -  val cmp = LessThan(currentMax, expr)
    -
    -  override def update(input: Row): Unit = {
    -    if (currentMax.value == null) {
    -      currentMax.value = expr.eval(input)
    -    } else if(cmp.eval(input) == true) {
    -      currentMax.value = expr.eval(input)
    -    }
    -  }
    +  // Semantically we probably don't need this, however, we need it when
    +  // integrating with Hive UDAF(GenericUDAF)
    +  @deprecated
    +  def terminatePartial(buf: MutableRow): Unit = {}
     
    -  override def eval(input: Row): Any = currentMax.value
    +  // Output the final result by feeding the aggregation buffer
    +  def terminate(input: Row): Any
     }
     
    -case class Count(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
    +trait AggregateExpression extends Expression with AggregateFunction {
    +  self: Product =>
    +  type EvaluatedType = Any
     
    -  override def nullable: Boolean = false
    -  override def dataType: LongType.type = LongType
    -  override def toString: String = s"COUNT($child)"
    +  var mode: Mode = COMPLETE
     
    -  override def asPartial: SplitEvaluation = {
    -    val partialCount = Alias(Count(child), "PartialCount")()
    -    SplitEvaluation(Coalesce(Seq(Sum(partialCount.toAttribute), 
Literal(0L))), partialCount :: Nil)
    +  def initial(m: Mode): Unit = {
    +    this.mode = m
       }
     
    -  override def newInstance(): CountFunction = new CountFunction(child, 
this)
    -}
    -
    -case class CountDistinct(expressions: Seq[Expression]) extends 
PartialAggregate {
    -  def this() = this(null)
    +  // Aggregation Buffer data types
    +  def bufferDataType: Seq[DataType] = Nil
    +  // Is it a distinct aggregate expression?
    +  def distinct: Boolean
    +  // Is it a distinct like aggregate expression (e.g. Min/Max is 
distinctLike, while avg is not)
    +  def distinctLike: Boolean = false
    --- End diff --
    
    I think `distinctLike` is removed in this patch.
    That is, all the codes for future optimization need to be removed to 
simplify this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to