[
https://issues.apache.org/jira/browse/SPARK-21964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-21964.
----------------------------------
Resolution: Incomplete
> Enable splitting the Aggregate (on Expand) into a number of Aggregates for
> grouing analytics
> --------------------------------------------------------------------------------------------
>
> Key: SPARK-21964
> URL: https://issues.apache.org/jira/browse/SPARK-21964
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.0.0, 2.0.2, 2.1.0, 2.1.1, 2.2.0
> Reporter: Feng Zhu
> Priority: Major
> Labels: bulk-closed
> Attachments: OOMRetry.png, Union.png, before.png
>
>
> In current versions, Spark SQL implements grouping analytics clauses (i.e.,
> cube, rollup and grouping sets) as a single Aggregate operator on a single
> Expand operator. With this implementation, we can read the table only once.
> However, for many scenarios (e.g., high dimensions cube), the Expand operator
> is too "heavy" with a large number of projections, resulting vast shuffle
> write.
> In our production environment, we have encountered various such cases,
> leading to low performance or even OOM issues for direct buffer memory. We
> demonstrate the issue with the following real-world query of a 6-dimensional
> cube.
>
> {code:sql}
> SELECT CASE WHEN grouping(iFrom) = 1 THEN -1 ELSE iFrom END AS iFrom
> ,CASE WHEN grouping(iSrcId) = 1 THEN -1 ELSE iSrcId END AS iSrcId
> ,CASE WHEN grouping(sgametype) = 1 THEN '-1' ELSE sgametype END AS
> sgametype
> ,CASE WHEN grouping(iOperId) = 1 THEN -1 ELSE iOperId END AS iOperId
> ,CASE WHEN grouping(igameid) = 1 THEN -1 ELSE igameid END AS igameid
> ,CASE WHEN grouping(iacttypeid) = 1 THEN -1 ELSE iacttypeid END AS
> iacttypeid
> ,SUM(iclickcnt) AS iclickcnt
> FROM p_day_advert
> WHERE statedate = 20170810
> GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
> {code}
> For such query, the Expand operator will generates 64 (i.e., 64=2^6)
> projections. Though the query reads only about 3GB data, it produces about
> 250GB data for shuffle write. In our environment, the first stage costs about
> 2 hours.
> !https://issues.apache.org/jira/secure/attachment/12886247/before.png!
> The second stage is easy to get an OOM error unless we enlarge the some
> configurations.
> !https://issues.apache.org/jira/secure/attachment/12886246/OOMRetry.png!
> Therefore, we tend to provide another choice which enables splitting the
> heavyweight aggregate into a number of lightweight aggregates for each group.
> Actually, it implements the grouping analytics as Union and executes the
> aggregates one by one. Though it reads the data many times, we can still
> achieve overall high performance. With such implementation, the query can be
> accomplished in about 20 mins, of which each aggregation takes 1~4 mins.
> !https://issues.apache.org/jira/secure/attachment/12886245/Union.png!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]