[ 
https://issues.apache.org/jira/browse/PIG-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14603465#comment-14603465
 ] 

Rohini Palaniswamy commented on PIG-4612:
-----------------------------------------

I still believe that you should have to fix CombinerOptimizer and not be 
looking at a new LogicalPlanOptimizer. CombinerOptimizer gives you more control 
on changing the plan and scope to handle lot of different conditions and 
combinations. For eg: LIMIT + FILTER, ORDER + LIMIT +FILTER, etc inside the 
nested foreach. But I doubt it would be possible to handle multiple operators 
like those inside nested foreach with logical optimizer rules.

> accumulating upon filters is still accumulating
> -----------------------------------------------
>
>                 Key: PIG-4612
>                 URL: https://issues.apache.org/jira/browse/PIG-4612
>             Project: Pig
>          Issue Type: Improvement
>          Components: internal-udfs
>    Affects Versions: 0.15.0
>         Environment: I use yarn not tez nor spark, but i think the problem 
> also exists in those environments
>            Reporter: Remi Catherinot
>              Labels: performance
>
> Accumulator are not used when accumulating filter results. Here is a script 
> with no filters which end-up having a map-combine-reduce plan which 
> efficiently use Accumulator design.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
>    GENERATE MAX(A.b) AS accumulated;
> }
> If i put a filter and MAX upon it, I end-up with a map-reduce plan (no 
> combine) which first generate whole bags as the filtered elements then feed 
> those bags to the reducers, that requires more memory, so more spills are 
> needed which consumme IO, and also more CPU is needed to handle all this.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
>    C = FILTER A BY c == 'toto';
>    GENERATE MAX(C.b) AS not_accumulated_just_reduced;
> }
> In my production environnement, i have some jobs that take hours to run, with 
> memory hungry containers and still do a lot of spill-to-disk. If i hack in to 
> push the filter into the max accumulator, then the job is finished in 5 to 10 
> minutes. I think it is possible to develop a PlanOptimizer than would 
> rewritte the 2nd script to something like this in a generic way :
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
>    GENERATE filter_when_c_equals_toto_MAX(A.b) AS accumulated;
> }
> the filtered accumulator will be an accumulator itself and wrap any other 
> accumulator, forwarding value to be accumulated only if the filtered eval 
> function is true.
> This idea can also work on distinct and co, filtered accumulator can wrap 
> each other in the layered way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to