[ https://issues.apache.org/jira/browse/PIG-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13103054#comment-13103054 ]
Thejas M Nair commented on PIG-2228: ------------------------------------ Here are results of the performance benchmark I ran. The summary: There is improvement of upto 50% in map run time with map partial aggregation with in-map partial aggregation. But if there is not enough reduction number of map output records, there is a performance degradation . It makes sense to turn off this feature in that case. The input data was generated from pigmix page_views data with 40M records so that the large map and bag fields which were not used the tests are not included. The query used to create the input data - {code} A = load '/user/pig/tests/pigmix/page_views' using org.apache.pig.test.udf.storefunc.PigPerformanceLoader() as (user : chararray, action : int, timespent : int , query_term :chararray, ip_addr : chararray, timestamp : long); store A into '/user/pig/tests/pigmix/page_views/pv_part'; {code} The map average runtime printed at end of ||Query || Change in number of records Map Input -> Map Output -> Reduce input recs || map avergage runtime improvement wrt trunk || group keys || projection in foreach after group-by || | group on many columns (groupall.pig) | 40M -> 39.98M -> 39.79M | -14% | user, action,query_term,ip_addr,timestamp | user,action, qt,ip,ts, AVG(B.timespent); | |group on single column with high cardinality(singlegroup_user.pig) | 40M records -> 12 M -> 6.2M |8% | user | user, AVG(B.timespent) | |group on single column with low cardinality (singlegroup_action.pig) | 40M -> 38 -> 38 | 38.6% | action | action, AVG(B.timespent)| |group-all (group_allstar.pig)| 40M -> 10 -> 10 | 52% | 'all' | MAX(B.timestamp), SUM(B.action), AVG(B.timespent) | Note: I used different split size for different queries so that the map runtime is around 1 minute (long enough to make time measurements more meaningful , but short enough for multiple runs). The number of maps varied from 9 - 20 in most cases. The same number of maps were used when same query was run with map-agg on/off settings. I also ran tests using input data compressed using bzip2, the performance numbers were different, although the trend was same. The reduction in number of map output records that would justify use of map partial aggregation was different - singlegroup_user.pig which showed 8% performance improvement with uncompressed data had 3.5% performance degradation with bzip2 input. > support partial aggregation in map task > --------------------------------------- > > Key: PIG-2228 > URL: https://issues.apache.org/jira/browse/PIG-2228 > Project: Pig > Issue Type: Bug > Reporter: Thejas M Nair > Assignee: Thejas M Nair > Fix For: 0.10 > > Attachments: PIG-2228.1.patch, PIG-2228.2.patch, PIG-2228.3.patch > > > h3. Introduction > Pig does (sort based) partial aggregation in map side through the use of > combiner. MR serializes the output of map to a buffer, sorts it on the keys, > deserializes and passes the values grouped on the keys to combiner phase. The > same work of combiner can be done in the map phase itself by using a hash-map > on the keys. This hash based (partial) aggregation can be done with or > without a combiner phase. > h3. Benefits > It will send fewer records to combiner and thereby - > * Save on cost of serializing and de-serializing > * Save on cost of lock calls on the combiner input buffer. (I have found > this to be a significant cost for a query that was doing multiple group-by's > in a single MR job. -Thejas) > * The problem of running out of memory in reduce side, for queries like > COUNT(distinct col) can be avoided. The OOM issue happens because very large > records get created after the combiner run on merged reduce input. In case of > combiner, you have no way of telling MR not to combine records in reduce > side. The workaround is to disable combiner completely, and the opportunity > to reduce map output size is lost. > * When the foreach after group-by has both algebraic and non-algebraic > functions, or if a bag is being projected, the combiner is not used. This is > because the data size reduction in typical cases are not significant enough > to justify the additional (de)serialization costs. But hash based aggregation > can be used in such cases as well. > * It is possible to turn off the in-map combine automatically if there is > not enough 'combination' that is taking place to justify the overhead of the > in-map combiner. (Idea borrowed from Hive jira.) > * If input data is sorted, it is possible to do efficient map side > (partial) aggregation with in-map combiner. > Design proposal is here - > https://cwiki.apache.org/confluence/display/PIG/PigInMapCombinerProposal -- This message is automatically generated by JIRA. For more information on JIRA, see: http://www.atlassian.com/software/jira