Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/8973#discussion_r41579349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala --- @@ -95,98 +92,192 @@ private[sql] case class AggregateExpression2( override def toString: String = s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)" } -abstract class AggregateFunction2 - extends Expression with ImplicitCastInputTypes { +/** + * AggregateFunction2 is the superclass of two aggregation function interfaces: + * + * - [[ImperativeAggregate]] is for aggregation functions that are specified in terms of + * initialize(), update(), and merge() functions that operate on Row-based aggregation buffers. + * - [[ExpressionAggregate]] is for aggregation functions that are specified using + * Catalyst expressions. + * + * In both interfaces, aggregates must define the schema ([[aggBufferSchema]]) and attributes + * ([[aggBufferAttributes]]) of an aggregation buffer which is used to hold partial aggregate + * results. At runtime, multiple aggregate functions are evaluated by the same operator using a + * combined aggregation buffer which concatenates the aggregation buffers of the individual + * aggregate functions. + * + * Code which accepts [[AggregateFunction2]] instances should be prepared to handle both types of + * aggregate functions. + */ +sealed abstract class AggregateFunction2 extends Expression with ImplicitCastInputTypes { /** An aggregate function is not foldable. */ final override def foldable: Boolean = false + /** The schema of the aggregation buffer. */ + def aggBufferSchema: StructType + + /** Attributes of fields in aggBufferSchema. */ + def aggBufferAttributes: Seq[AttributeReference] + /** - * The offset of this function's start buffer value in the - * underlying shared mutable aggregation buffer. - * For example, we have two aggregate functions `avg(x)` and `avg(y)`, which share - * the same aggregation buffer. In this shared buffer, the position of the first - * buffer value of `avg(x)` will be 0 and the position of the first buffer value of `avg(y)` - * will be 2. + * Attributes of fields in input aggregation buffers (immutable aggregation buffers that are + * merged with mutable aggregation buffers in the merge() function or merge expressions). + * These attributes are created automatically by cloning the [[aggBufferAttributes]]. */ - protected var mutableBufferOffset: Int = 0 - - def withNewMutableBufferOffset(newMutableBufferOffset: Int): Unit = { - mutableBufferOffset = newMutableBufferOffset - } + def inputAggBufferAttributes: Seq[AttributeReference] /** - * The offset of this function's start buffer value in the - * underlying shared input aggregation buffer. An input aggregation buffer is used - * when we merge two aggregation buffers and it is basically the immutable one - * (we merge an input aggregation buffer and a mutable aggregation buffer and - * then store the new buffer values to the mutable aggregation buffer). - * Usually, an input aggregation buffer also contain extra elements like grouping - * keys at the beginning. So, mutableBufferOffset and inputBufferOffset are often - * different. - * For example, we have a grouping expression `key``, and two aggregate functions - * `avg(x)` and `avg(y)`. In this shared input aggregation buffer, the position of the first - * buffer value of `avg(x)` will be 1 and the position of the first buffer value of `avg(y)` - * will be 3 (position 0 is used for the value of key`). + * Indicates if this function supports partial aggregation. + * Currently Hive UDAF is the only one that doesn't support partial aggregation. */ - protected var inputBufferOffset: Int = 0 + def supportsPartial: Boolean = true - def withNewInputBufferOffset(newInputBufferOffset: Int): Unit = { - inputBufferOffset = newInputBufferOffset - } + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = + throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") +} - /** The schema of the aggregation buffer. */ - def bufferSchema: StructType +/** + * API for aggregation functions that are expressed in terms of imperative initialize(), update(), + * and merge() functions which operate on Row-based aggregation buffers. + * + * Within these functions, code should access fields of the mutable aggregation buffer by adding the + * bufferSchema-relative field number to `mutableAggBufferOffset` then using this new field number + * to access the buffer Row. This is necessary because this aggregation function's buffer is + * embedded inside of a larger shared aggregation buffer when an aggregation operator evaluates + * multiple aggregate functions at the same time. + * + * We need to perform similar field number arithmetic when merging multiple intermediate + * aggregate buffers together in `merge()` (in this case, use `inputAggBufferOffset` when accessing + * the input buffer). + */ +abstract class ImperativeAggregate extends AggregateFunction2 { - /** Attributes of fields in bufferSchema. */ - def bufferAttributes: Seq[AttributeReference] + /** + * The offset of this function's first buffer value in the underlying shared mutable aggregation + * buffer. + * + * For example, we have two aggregate functions `avg(x)` and `avg(y)`, which share the same + * aggregation buffer. In this shared buffer, the position of the first buffer value of `avg(x)` + * will be 0 and the position of the first buffer value of `avg(y)` will be 2: + * + * avg(x) mutableAggBufferOffset = 0 + * | + * v + * +--------+--------+--------+--------+ + * | sum1 | count1 | sum2 | count2 | + * +--------+--------+--------+--------+ + * ^ + * | + * avg(y) mutableAggBufferOffset = 2 + * + */ + protected var mutableAggBufferOffset: Int = 0 - /** Clones bufferAttributes. */ - def cloneBufferAttributes: Seq[Attribute] + def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Unit = { --- End diff -- Since this method returns `Unit` instead of `ImperativeAggregate`, maybe `setXXX` is a better name than `withXXX`?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org