Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8973#discussion_r41220510
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
    @@ -95,98 +92,186 @@ private[sql] case class AggregateExpression2(
       override def toString: String = 
s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
     }
     
    -abstract class AggregateFunction2
    -  extends Expression with ImplicitCastInputTypes {
    +/**
    + * AggregateFunction2 is the superclass of two aggregation function 
interfaces:
    + *
    + *  - [[ImperativeAggregateFunction]] is for aggregation functions that 
are specified in terms of
    + *    initialize(), update(), and merge() functions that operate on 
Row-based aggregation buffers.
    + *  - [[ExpressionAggregateFunction]] is for aggregation functions that 
are specified using
    + *    Catalyst expressions.
    + *
    + * In both interfaces, aggregates must define the schema 
([[aggBufferSchema]]) and attributes
    + * ([[aggBufferAttributes]]) of an aggregation buffer which is used to 
hold partial aggregate results.
    + * At runtime, multiple aggregate functions are evaluated by the same 
operator using a combined
    + * aggregation buffer which concatenates the aggregation buffers of the 
individual aggregate
    + * functions.
    + *
    + * Code which accepts [[AggregateFunction2]] instances should be prepared 
to handle both types of
    + * aggregate functions.
    + */
    +sealed abstract class AggregateFunction2 extends Expression with 
ImplicitCastInputTypes {
     
       /** An aggregate function is not foldable. */
       final override def foldable: Boolean = false
     
    +  /** The schema of the aggregation buffer. */
    +  def aggBufferSchema: StructType
    +
    +  /** Attributes of fields in aggBufferSchema. */
    +  def aggBufferAttributes: Seq[AttributeReference]
    +
       /**
    -   * The offset of this function's start buffer value in the
    -   * underlying shared mutable aggregation buffer.
    -   * For example, we have two aggregate functions `avg(x)` and `avg(y)`, 
which share
    -   * the same aggregation buffer. In this shared buffer, the position of 
the first
    -   * buffer value of `avg(x)` will be 0 and the position of the first 
buffer value of `avg(y)`
    -   * will be 2.
    +   * Attributes of fields in input aggregation buffers (immutable 
aggregation buffers that are
    +   * merged with mutable aggregation buffers in the merge() function or 
merge expressions).
    +   * These attributes are created automatically by cloning the 
[[aggBufferAttributes]].
        */
    -  protected var mutableBufferOffset: Int = 0
    -
    -  def withNewMutableBufferOffset(newMutableBufferOffset: Int): Unit = {
    -    mutableBufferOffset = newMutableBufferOffset
    -  }
    +  final lazy val inputAggBufferAttributes: Seq[Attribute] = 
aggBufferAttributes.map(_.newInstance())
    --- End diff --
    
    This was duplicated in all subclasses, but I don't see any reason to let 
subclasses override it (seems like that will lead to bugs).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to