Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14723#discussion_r75586350
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
    @@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
         def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
       }
     }
    +
    +/**
    + * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
    + * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
    + * class ImperativeAggregate.
    + *
    + * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
    + * Final mode aggregate at Reducer side).
    + *
    + * Stage 1: Partial aggregate at Mapper side:
    + *
    + *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
    + *    object, object A for example, in aggBuffer. The object A will be 
used to store the
    + *    accumulated aggregation result.
    + *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
    + *    current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
    + *    object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
    + *  1. After processing all rows of current group, the framework will call 
method
    + *    `serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
    + *    to a serializable format in place.
    + *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
    + *    It is safe since we have already convert aggregationBuffer to 
serializable format.
    + *  1. Spark framework moves on to next group, until all groups have been
    + *    processed.
    + *
    + * Shuffling exchange data to Reducer tasks...
    + *
    + * Stage 2: Final mode aggregate at Reducer side:
    + *
    + *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
    + *    in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
    + *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
    + *    extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
    + *    user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
    + *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
    + *    call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
    + *    serialize object A1 to a serializable format in place.
    + *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
    + *    It is safe since we have already convert aggregationBuffer to 
serializable format.
    + *  1. Spark framework moves on to next group, until all groups have been 
processed.
    + */
    +trait WithObjectAggregateBuffer {
    +  this: ImperativeAggregate =>
    --- End diff --
    
    oh, seems this trait will be still an java `interface`. But, I think in 
general, we do not really need to have this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to