Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/3247#discussion_r27717364 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala --- @@ -17,285 +17,159 @@ package org.apache.spark.sql.catalyst.expressions -import com.clearspring.analytics.stream.cardinality.HyperLogLog - import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.trees -import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.util.collection.OpenHashSet - -abstract class AggregateExpression extends Expression { - self: Product => - /** - * Creates a new instance that can be used to compute this aggregate expression for a group - * of input rows/ - */ - def newInstance(): AggregateFunction - /** - * [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are - * replaced with a physical aggregate operator at runtime. - */ - override def eval(input: Row = null): EvaluatedType = - throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") -} +/** + * This is from org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode + * Just a hint for the UDAF developers which stage we are about to process, + * However, we probably don't want the developers knows so many details, here + * is just for keep consistent with Hive (when integrated with Hive), need to + * figure out if we have work around for that soon. + */ +@deprecated +trait Mode /** - * Represents an aggregation that has been rewritten to be performed in two steps. - * - * @param finalEvaluation an aggregate expression that evaluates to same final result as the - * original aggregation. - * @param partialEvaluations A sequence of [[NamedExpression]]s that can be computed on partial - * data sets and are required to compute the `finalEvaluation`. + * PARTIAL1: from original data to partial aggregation data: iterate() and + * terminatePartial() will be called. */ -case class SplitEvaluation( - finalEvaluation: Expression, - partialEvaluations: Seq[NamedExpression]) +@deprecated +case object PARTIAL1 extends Mode /** - * An [[AggregateExpression]] that can be partially computed without seeing all relevant tuples. - * These partial evaluations can then be combined to compute the actual answer. + * PARTIAL2: from partial aggregation data to partial aggregation data: + * merge() and terminatePartial() will be called. */ -abstract class PartialAggregate extends AggregateExpression { - self: Product => +@deprecated +case object PARTIAL2 extends Mode +/** + * FINAL: from partial aggregation to full aggregation: merge() and + * terminate() will be called. + */ +@deprecated +case object FINAL extends Mode +/** + * COMPLETE: from original data directly to full aggregation: iterate() and + * terminate() will be called. + */ +@deprecated +case object COMPLETE extends Mode - /** - * Returns a [[SplitEvaluation]] that computes this aggregation using partial aggregation. - */ - def asPartial: SplitEvaluation -} /** - * A specific implementation of an aggregate function. Used to wrap a generic - * [[AggregateExpression]] with an algorithm that will be used to compute one specific result. + * Aggregation Function Interface + * All of the function will be called within Spark executors. */ -abstract class AggregateFunction - extends AggregateExpression with Serializable with trees.LeafNode[Expression] { +trait AggregateFunction { self: Product => - override type EvaluatedType = Any - - /** Base should return the generic aggregate expression that this function is computing */ - val base: AggregateExpression - - override def nullable: Boolean = base.nullable - override def dataType: DataType = base.dataType + // Specify the BoundReference for Aggregate Buffer + def initialBoundReference(buffers: Seq[BoundReference]): Unit - def update(input: Row): Unit + // Initialize (reinitialize) the aggregation buffer + def reset(buf: MutableRow): Unit - // Do we really need this? - override def newInstance(): AggregateFunction = { - makeCopy(productIterator.map { case a: AnyRef => a }.toArray) - } -} + // Expect the aggregate function fills the aggregation buffer when + // fed with each value in the group + def iterate(arguments: Any, buf: MutableRow): Unit -case class Min(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { + // Merge 2 aggregation buffer, and write back to the later one + def merge(value: Row, buf: MutableRow): Unit - override def nullable: Boolean = true - override def dataType: DataType = child.dataType - override def toString: String = s"MIN($child)" - - override def asPartial: SplitEvaluation = { - val partialMin = Alias(Min(child), "PartialMin")() - SplitEvaluation(Min(partialMin.toAttribute), partialMin :: Nil) - } - - override def newInstance(): MinFunction = new MinFunction(child, this) -} - -case class MinFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType) - val cmp = GreaterThan(currentMin, expr) - - override def update(input: Row): Unit = { - if (currentMin.value == null) { - currentMin.value = expr.eval(input) - } else if(cmp.eval(input) == true) { - currentMin.value = expr.eval(input) - } - } - - override def eval(input: Row): Any = currentMin.value -} - -case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { - - override def nullable: Boolean = true - override def dataType: DataType = child.dataType - override def toString: String = s"MAX($child)" - - override def asPartial: SplitEvaluation = { - val partialMax = Alias(Max(child), "PartialMax")() - SplitEvaluation(Max(partialMax.toAttribute), partialMax :: Nil) - } - - override def newInstance(): MaxFunction = new MaxFunction(child, this) -} - -case class MaxFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType) - val cmp = LessThan(currentMax, expr) - - override def update(input: Row): Unit = { - if (currentMax.value == null) { - currentMax.value = expr.eval(input) - } else if(cmp.eval(input) == true) { - currentMax.value = expr.eval(input) - } - } + // Semantically we probably don't need this, however, we need it when + // integrating with Hive UDAF(GenericUDAF) + @deprecated + def terminatePartial(buf: MutableRow): Unit = {} --- End diff -- We need to write not hive-specific codes, but generic ones in o.a.spark.sql.core.*. If we essentially need not this interface for this patch, it should be removed.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org