GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/15590

    [SPARK-17949][SQL] A Java object based aggregate operator

    ## What changes were proposed in this pull request?
    
    This PR adds a new hash-based aggregate operator named 
`ObjectHashAggregateExec` that supports `TypedImperativeAggregate`, which may 
use arbitrary Java objects as aggregation states. Please refer to the [design 
doc][1] attached in [SPARK-17949][2] for more details about it.
    
    [1]: 
https://issues.apache.org/jira/secure/attachment/12834260/%5BDesign%20Doc%5D%20Support%20for%20Arbitrary%20Aggregation%20States.pdf
    [2]: https://issues.apache.org/jira/browse/SPARK-17949
    
    Major benefits of this operator is better performance when evaluating 
`TypedImperativeAggregate` functions, especially when there are relatively few 
distinct groups. Functions like Hive UDAFs, `collect_list`, and `collect_set` 
may also benefit from this after being migrated to `TypedImperativeAggregate`.
    
    The following feature flag is introduced to enable or disable the new 
aggregate operator:
    
    - Name: `spark.sql.execution.useObjectHashAggregateExec`
    - Default value: `true`
    
    We can also configure the fallback threshold using the following SQL 
operation:
    
    - Name: `spark.sql.objectHashAggregate.sortBased.fallbackThreshold`
    - Default value: 128
    
      Fallback to sort-based aggregation when more then 128 distinct groups are 
accumulated in the aggregation hash map. This number is intentionally made 
small to avoid GC problems since aggregation buffers of this operator may 
contain arbitrary Java objects.
    
      This may be improved by implementing size tracking for this operator, but 
that can be done in a separate PR.
    
    Code generation and size tracking are planned to be implemented in 
follow-up PRs.
    
    ## Benchmark results
    
    ### `ObjectHashAggregateExec` vs `SortAggregateExec`
    
    The first benchmark compares `ObjectHashAggregateExec` and 
`SortAggregateExec` by evaluating `typed_count`, a testing 
`TypedImperativeAggregate` version of the SQL `count` function.
    
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    
    object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    sort agg w/ group by                        31251 / 31908          3.4      
   298.0       1.0X
    object agg w/ group by w/o fallback           6903 / 7141         15.2      
    65.8       4.5X
    object agg w/ group by w/ fallback          20945 / 21613          5.0      
   199.7       1.5X
    sort agg w/o group by                         4734 / 5463         22.1      
    45.2       6.6X
    object agg w/o group by w/o fallback          4310 / 4529         24.3      
    41.1       7.3X
    ```
    
    The next benchmark compares `ObjectHashAggregateExec` and 
`SortAggregateExec` by evaluating the Spark native version of 
`percentile_approx`.
    
    Note that `percentile_approx` is so heavy an aggregate function that the 
bottleneck of the benchmark is evaluating the aggregate function itself rather 
than the aggregate operator. That's why the results are so close and looks 
counter-intuitive (aggregation without group by is even faster than that 
aggregation with group by).
    
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    
    object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    sort agg w/ group by                          3418 / 3530          0.6      
  1630.0       1.0X
    object agg w/ group by w/o fallback           3210 / 3314          0.7      
  1530.7       1.1X
    object agg w/ group by w/ fallback            3419 / 3511          0.6      
  1630.1       1.0X
    sort agg w/o group by                         4336 / 4499          0.5      
  2067.3       0.8X
    object agg w/o group by w/o fallback          4271 / 4372          0.5      
  2036.7       0.8X
    ```
    
    ### Hive UDAF vs Spark AF
    
    This benchmark compares the following two kinds of aggregate functions:
    
    - "hive udaf": Hive implementation of `percentile_approx`, without partial 
aggregation supports, evaluated using `SortAggregateExec`.
    - "spark af": Spark native implementation of `percentile_approx`, with 
partial aggregation support, evaluated using `ObjectHashAggregateExec`
    
    The performance differences is mostly due to faster implementation and 
partial aggregation support in the Spark native version of `percentile_approx`.
    
    This benchmark basically shows the performance differences between the 
worst case, where an aggregate function without partial aggregation support is 
evaluated using `SortAggregateExec`, and the best case, where a 
`TypedImperativeAggregate` with partial aggregation support is evaluated using 
`ObjectHashAggregateExec`.
    
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    
    hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    hive udaf w/o group by                        5326 / 5408          0.0      
 81264.2       1.0X
    spark af w/o group by                           93 /  111          0.7      
  1415.6      57.4X
    hive udaf w/ group by                         3804 / 3946          0.0      
 58050.1       1.4X
    spark af w/ group by w/o fallback               71 /   90          0.9      
  1085.7      74.8X
    spark af w/ group by w/ fallback                98 /  111          0.7      
  1501.6      54.1X
    ```
    
    ## How was this patch tested?
    
    New unit tests and randomized test cases are added in 
`ObjectAggregateFunctionSuite`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark obj-hash-agg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15590.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15590
    
----
commit c8ec18b37410cfeaad226f57e66b88b3d465f09a
Author: Cheng Lian <l...@databricks.com>
Date:   2016-08-04T07:54:32Z

    Initial commit for the new ObjectHashAggregateExec operator

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to