Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8973#discussion_r41579349
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
    @@ -95,98 +92,192 @@ private[sql] case class AggregateExpression2(
       override def toString: String = 
s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
     }
     
    -abstract class AggregateFunction2
    -  extends Expression with ImplicitCastInputTypes {
    +/**
    + * AggregateFunction2 is the superclass of two aggregation function 
interfaces:
    + *
    + *  - [[ImperativeAggregate]] is for aggregation functions that are 
specified in terms of
    + *    initialize(), update(), and merge() functions that operate on 
Row-based aggregation buffers.
    + *  - [[ExpressionAggregate]] is for aggregation functions that are 
specified using
    + *    Catalyst expressions.
    + *
    + * In both interfaces, aggregates must define the schema 
([[aggBufferSchema]]) and attributes
    + * ([[aggBufferAttributes]]) of an aggregation buffer which is used to 
hold partial aggregate
    + * results. At runtime, multiple aggregate functions are evaluated by the 
same operator using a
    + * combined aggregation buffer which concatenates the aggregation buffers 
of the individual
    + * aggregate functions.
    + *
    + * Code which accepts [[AggregateFunction2]] instances should be prepared 
to handle both types of
    + * aggregate functions.
    + */
    +sealed abstract class AggregateFunction2 extends Expression with 
ImplicitCastInputTypes {
     
       /** An aggregate function is not foldable. */
       final override def foldable: Boolean = false
     
    +  /** The schema of the aggregation buffer. */
    +  def aggBufferSchema: StructType
    +
    +  /** Attributes of fields in aggBufferSchema. */
    +  def aggBufferAttributes: Seq[AttributeReference]
    +
       /**
    -   * The offset of this function's start buffer value in the
    -   * underlying shared mutable aggregation buffer.
    -   * For example, we have two aggregate functions `avg(x)` and `avg(y)`, 
which share
    -   * the same aggregation buffer. In this shared buffer, the position of 
the first
    -   * buffer value of `avg(x)` will be 0 and the position of the first 
buffer value of `avg(y)`
    -   * will be 2.
    +   * Attributes of fields in input aggregation buffers (immutable 
aggregation buffers that are
    +   * merged with mutable aggregation buffers in the merge() function or 
merge expressions).
    +   * These attributes are created automatically by cloning the 
[[aggBufferAttributes]].
        */
    -  protected var mutableBufferOffset: Int = 0
    -
    -  def withNewMutableBufferOffset(newMutableBufferOffset: Int): Unit = {
    -    mutableBufferOffset = newMutableBufferOffset
    -  }
    +  def inputAggBufferAttributes: Seq[AttributeReference]
     
       /**
    -   * The offset of this function's start buffer value in the
    -   * underlying shared input aggregation buffer. An input aggregation 
buffer is used
    -   * when we merge two aggregation buffers and it is basically the 
immutable one
    -   * (we merge an input aggregation buffer and a mutable aggregation 
buffer and
    -   * then store the new buffer values to the mutable aggregation buffer).
    -   * Usually, an input aggregation buffer also contain extra elements like 
grouping
    -   * keys at the beginning. So, mutableBufferOffset and inputBufferOffset 
are often
    -   * different.
    -   * For example, we have a grouping expression `key``, and two aggregate 
functions
    -   * `avg(x)` and `avg(y)`. In this shared input aggregation buffer, the 
position of the first
    -   * buffer value of `avg(x)` will be 1 and the position of the first 
buffer value of `avg(y)`
    -   * will be 3 (position 0 is used for the value of key`).
    +   * Indicates if this function supports partial aggregation.
    +   * Currently Hive UDAF is the only one that doesn't support partial 
aggregation.
        */
    -  protected var inputBufferOffset: Int = 0
    +  def supportsPartial: Boolean = true
     
    -  def withNewInputBufferOffset(newInputBufferOffset: Int): Unit = {
    -    inputBufferOffset = newInputBufferOffset
    -  }
    +  override protected def genCode(ctx: CodeGenContext, ev: 
GeneratedExpressionCode): String =
    +    throw new UnsupportedOperationException(s"Cannot evaluate expression: 
$this")
    +}
     
    -  /** The schema of the aggregation buffer. */
    -  def bufferSchema: StructType
    +/**
    + * API for aggregation functions that are expressed in terms of imperative 
initialize(), update(),
    + * and merge() functions which operate on Row-based aggregation buffers.
    + *
    + * Within these functions, code should access fields of the mutable 
aggregation buffer by adding the
    + * bufferSchema-relative field number to `mutableAggBufferOffset` then 
using this new field number
    + * to access the buffer Row. This is necessary because this aggregation 
function's buffer is
    + * embedded inside of a larger shared aggregation buffer when an 
aggregation operator evaluates
    + * multiple aggregate functions at the same time.
    + *
    + * We need to perform similar field number arithmetic when merging 
multiple intermediate
    + * aggregate buffers together in `merge()` (in this case, use 
`inputAggBufferOffset` when accessing
    + * the input buffer).
    + */
    +abstract class ImperativeAggregate extends AggregateFunction2 {
     
    -  /** Attributes of fields in bufferSchema. */
    -  def bufferAttributes: Seq[AttributeReference]
    +  /**
    +   * The offset of this function's first buffer value in the underlying 
shared mutable aggregation
    +   * buffer.
    +   *
    +   * For example, we have two aggregate functions `avg(x)` and `avg(y)`, 
which share the same
    +   * aggregation buffer. In this shared buffer, the position of the first 
buffer value of `avg(x)`
    +   * will be 0 and the position of the first buffer value of `avg(y)` will 
be 2:
    +   *
    +   *          avg(x) mutableAggBufferOffset = 0
    +   *                  |
    +   *                  v
    +   *                  +--------+--------+--------+--------+
    +   *                  |  sum1  | count1 |  sum2  | count2 |
    +   *                  +--------+--------+--------+--------+
    +   *                                    ^
    +   *                                    |
    +   *                     avg(y) mutableAggBufferOffset = 2
    +   *
    +   */
    +  protected var mutableAggBufferOffset: Int = 0
     
    -  /** Clones bufferAttributes. */
    -  def cloneBufferAttributes: Seq[Attribute]
    +  def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Unit 
= {
    --- End diff --
    
    Since this method returns `Unit` instead of `ImperativeAggregate`, maybe 
`setXXX` is a better name than `withXXX`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to