Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/3247#discussion_r27717124 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala --- @@ -17,285 +17,159 @@ package org.apache.spark.sql.catalyst.expressions -import com.clearspring.analytics.stream.cardinality.HyperLogLog - import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.trees -import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.util.collection.OpenHashSet - -abstract class AggregateExpression extends Expression { - self: Product => - /** - * Creates a new instance that can be used to compute this aggregate expression for a group - * of input rows/ - */ - def newInstance(): AggregateFunction - /** - * [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are - * replaced with a physical aggregate operator at runtime. - */ - override def eval(input: Row = null): EvaluatedType = - throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") -} +/** + * This is from org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode + * Just a hint for the UDAF developers which stage we are about to process, + * However, we probably don't want the developers knows so many details, here + * is just for keep consistent with Hive (when integrated with Hive), need to + * figure out if we have work around for that soon. + */ +@deprecated +trait Mode /** - * Represents an aggregation that has been rewritten to be performed in two steps. - * - * @param finalEvaluation an aggregate expression that evaluates to same final result as the - * original aggregation. - * @param partialEvaluations A sequence of [[NamedExpression]]s that can be computed on partial - * data sets and are required to compute the `finalEvaluation`. + * PARTIAL1: from original data to partial aggregation data: iterate() and + * terminatePartial() will be called. */ -case class SplitEvaluation( - finalEvaluation: Expression, - partialEvaluations: Seq[NamedExpression]) +@deprecated +case object PARTIAL1 extends Mode /** - * An [[AggregateExpression]] that can be partially computed without seeing all relevant tuples. - * These partial evaluations can then be combined to compute the actual answer. + * PARTIAL2: from partial aggregation data to partial aggregation data: + * merge() and terminatePartial() will be called. */ -abstract class PartialAggregate extends AggregateExpression { - self: Product => +@deprecated +case object PARTIAL2 extends Mode +/** + * FINAL: from partial aggregation to full aggregation: merge() and + * terminate() will be called. + */ +@deprecated +case object FINAL extends Mode +/** + * COMPLETE: from original data directly to full aggregation: iterate() and + * terminate() will be called. + */ +@deprecated +case object COMPLETE extends Mode - /** - * Returns a [[SplitEvaluation]] that computes this aggregation using partial aggregation. - */ - def asPartial: SplitEvaluation -} /** - * A specific implementation of an aggregate function. Used to wrap a generic - * [[AggregateExpression]] with an algorithm that will be used to compute one specific result. + * Aggregation Function Interface + * All of the function will be called within Spark executors. */ -abstract class AggregateFunction - extends AggregateExpression with Serializable with trees.LeafNode[Expression] { +trait AggregateFunction { self: Product => - override type EvaluatedType = Any - - /** Base should return the generic aggregate expression that this function is computing */ - val base: AggregateExpression - - override def nullable: Boolean = base.nullable - override def dataType: DataType = base.dataType + // Specify the BoundReference for Aggregate Buffer + def initialBoundReference(buffers: Seq[BoundReference]): Unit - def update(input: Row): Unit + // Initialize (reinitialize) the aggregation buffer + def reset(buf: MutableRow): Unit - // Do we really need this? - override def newInstance(): AggregateFunction = { - makeCopy(productIterator.map { case a: AnyRef => a }.toArray) - } -} + // Expect the aggregate function fills the aggregation buffer when + // fed with each value in the group + def iterate(arguments: Any, buf: MutableRow): Unit -case class Min(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { + // Merge 2 aggregation buffer, and write back to the later one + def merge(value: Row, buf: MutableRow): Unit - override def nullable: Boolean = true - override def dataType: DataType = child.dataType - override def toString: String = s"MIN($child)" - - override def asPartial: SplitEvaluation = { - val partialMin = Alias(Min(child), "PartialMin")() - SplitEvaluation(Min(partialMin.toAttribute), partialMin :: Nil) - } - - override def newInstance(): MinFunction = new MinFunction(child, this) -} - -case class MinFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType) - val cmp = GreaterThan(currentMin, expr) - - override def update(input: Row): Unit = { - if (currentMin.value == null) { - currentMin.value = expr.eval(input) - } else if(cmp.eval(input) == true) { - currentMin.value = expr.eval(input) - } - } - - override def eval(input: Row): Any = currentMin.value -} - -case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { - - override def nullable: Boolean = true - override def dataType: DataType = child.dataType - override def toString: String = s"MAX($child)" - - override def asPartial: SplitEvaluation = { - val partialMax = Alias(Max(child), "PartialMax")() - SplitEvaluation(Max(partialMax.toAttribute), partialMax :: Nil) - } - - override def newInstance(): MaxFunction = new MaxFunction(child, this) -} - -case class MaxFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType) - val cmp = LessThan(currentMax, expr) - - override def update(input: Row): Unit = { - if (currentMax.value == null) { - currentMax.value = expr.eval(input) - } else if(cmp.eval(input) == true) { - currentMax.value = expr.eval(input) - } - } + // Semantically we probably don't need this, however, we need it when + // integrating with Hive UDAF(GenericUDAF) + @deprecated + def terminatePartial(buf: MutableRow): Unit = {} - override def eval(input: Row): Any = currentMax.value + // Output the final result by feeding the aggregation buffer + def terminate(input: Row): Any } -case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { +trait AggregateExpression extends Expression with AggregateFunction { + self: Product => + type EvaluatedType = Any - override def nullable: Boolean = false - override def dataType: LongType.type = LongType - override def toString: String = s"COUNT($child)" + var mode: Mode = COMPLETE - override def asPartial: SplitEvaluation = { - val partialCount = Alias(Count(child), "PartialCount")() - SplitEvaluation(Coalesce(Seq(Sum(partialCount.toAttribute), Literal(0L))), partialCount :: Nil) + def initial(m: Mode): Unit = { + this.mode = m } - override def newInstance(): CountFunction = new CountFunction(child, this) -} - -case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate { - def this() = this(null) + // Aggregation Buffer data types + def bufferDataType: Seq[DataType] = Nil + // Is it a distinct aggregate expression? + def distinct: Boolean + // Is it a distinct like aggregate expression (e.g. Min/Max is distinctLike, while avg is not) + def distinctLike: Boolean = false --- End diff -- I think `distinctLike` is removed in this patch. That is, all the codes for future optimization need to be removed to simplify this patch.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org