Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14753#discussion_r75797539
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
    @@ -389,3 +389,175 @@ abstract class DeclarativeAggregate
         def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
       }
     }
    +
    +/**
    + * Aggregation function which allows **arbitrary** user-defined java 
object to be used as internal
    + * aggregation buffer object.
    + *
    + * {{{
    + *                aggregation buffer for normal aggregation function `avg`
    + *                    |
    + *                    v
    + *                  
+--------------+---------------+-----------------------------------+
    + *                  |  sum1 (Long) | count1 (Long) | generic user-defined 
java objects |
    + *                  
+--------------+---------------+-----------------------------------+
    + *                                                     ^
    + *                                                     |
    + *                    Aggregation buffer object for 
`TypedImperativeAggregate` aggregation function
    + * }}}
    + *
    + * Work flow (Partial mode aggregate at Mapper side, and Final mode 
aggregate at Reducer side):
    + *
    + * Stage 1: Partial aggregate at Mapper side:
    + *
    + *  1. The framework calls `createAggregationBuffer(): T` to create an 
empty internal aggregation
    + *     buffer object.
    + *  2. Upon each input row, the framework calls
    + *     `update(buffer: T, input: InternalRow): Unit` to update the 
aggregation buffer object T.
    + *  3. After processing all rows of current group (group by key), the 
framework will serialize
    + *     aggregation buffer object T to SparkSQL internally supported 
underlying storage format, and
    + *     persist the serializable format to disk if needed.
    + *  4. The framework moves on to next group, until all groups have been 
processed.
    + *
    + * Shuffling exchange data to Reducer tasks...
    + *
    + * Stage 2: Final mode aggregate at Reducer side:
    + *
    + *  1. The framework calls `createAggregationBuffer(): T` to create an 
empty internal aggregation
    + *     buffer object (type T) for merging.
    + *  2. For each aggregation output of Stage 1, The framework de-serializes 
the storage
    + *     format and generates one input aggregation object (type T).
    + *  3. For each input aggregation object, the framework calls 
`merge(buffer: T, input: T): Unit`
    + *     to merge the input aggregation object into aggregation buffer 
object.
    + *  4. After processing all input aggregation objects of current group 
(group by key), the framework
    + *     calls method `eval(buffer: T)` to generate the final output for 
this group.
    + *  5. The framework moves on to next group, until all groups have been 
processed.
    + */
    +abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
    +
    +  /**
    +   * Creates an empty aggregation buffer object. This is called before 
processing each key group
    +   * (group by key).
    +   *
    +   * @return an aggregation buffer object
    +   */
    +  def createAggregationBuffer(): T
    +
    +  /**
    +   * In-place updates the aggregation buffer object with an input row. 
buffer = buffer + input.
    +   * This is typically called when doing Partial or Complete mode 
aggregation.
    +   *
    +   * @param buffer The aggregation buffer object.
    +   * @param input an input row
    +   */
    +  def update(buffer: T, input: InternalRow): Unit
    +
    +  /**
    +   * Merges an input aggregation object into aggregation buffer object. 
buffer = buffer + input.
    +   * This is typically called when doing PartialMerge or Final mode 
aggregation.
    +   *
    +   * @param buffer the aggregation buffer object used to store the 
aggregation result.
    +   * @param input an input aggregation object. Input aggregation object 
can be produced by
    +   *              de-serializing the partial aggregate's output from 
Mapper side.
    +   */
    +  def merge(buffer: T, input: T): Unit
    +
    +  /**
    +   * Generates the final aggregation result value for current key group 
with the aggregation buffer
    +   * object.
    +   *
    +   * @param buffer aggregation buffer object.
    +   * @return The aggregation result of current key group
    +   */
    +  def eval(buffer: T): Any
    +
    +  /** Returns the class of aggregation buffer object */
    +  def aggregationBufferClass: Class[T]
    +
    +  /** Serializes the aggregation buffer object T to Spark-sql internally 
supported storage format */
    +  def serialize(buffer: T): Any
    +
    +  /** De-serializes the storage format, and produces aggregation buffer 
object T */
    +  def deserialize(storageFormat: Any): T
    +
    +  /**
    +   * Returns the aggregation-buffer-object storage format's Sql type.
    +   *
    +   * Here is a list of supported storage format and corresponding Sql type:
    +   *
    +   * {{{
    +   *   aggregation buffer object's Storage format    |  storage format's 
Sql type
    +   * 
------------------------------------------------------------------------------------------
    +   *   Array[Byte] (*)                               |  BinaryType (*)
    --- End diff --
    
    I don't think so. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to