[
https://issues.apache.org/jira/browse/PIG-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14602536#comment-14602536
]
Remi Catherinot commented on PIG-4612:
--------------------------------------
since several logical plan optimizer rules can be put together into the same
rule-set name, i prefer to keep the limit case and the filter case into 2
seperate JIRAs. Even if the purpose is to trigger the use of the combine in
both cases, the code won't be the same and are likely to be very different.
Trying to put those 2 codes in the same will lead in hard to understand code.
We'll see later on if some portion may be merged or not. I'll take a look into
the limit case though to try my best doing code that could be re-used in both
cases.
thanks for info :)
> accumulating upon filters is still accumulating
> -----------------------------------------------
>
> Key: PIG-4612
> URL: https://issues.apache.org/jira/browse/PIG-4612
> Project: Pig
> Issue Type: Improvement
> Components: internal-udfs
> Affects Versions: 0.15.0
> Environment: I use yarn not tez nor spark, but i think the problem
> also exists in those environments
> Reporter: Remi Catherinot
> Labels: performance
>
> Accumulator are not used when accumulating filter results. Here is a script
> with no filters which end-up having a map-combine-reduce plan which
> efficiently use Accumulator design.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> GENERATE MAX(A.b) AS accumulated;
> }
> If i put a filter and MAX upon it, I end-up with a map-reduce plan (no
> combine) which first generate whole bags as the filtered elements then feed
> those bags to the reducers, that requires more memory, so more spills are
> needed which consumme IO, and also more CPU is needed to handle all this.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> C = FILTER A BY c == 'toto';
> GENERATE MAX(C.b) AS not_accumulated_just_reduced;
> }
> In my production environnement, i have some jobs that take hours to run, with
> memory hungry containers and still do a lot of spill-to-disk. If i hack in to
> push the filter into the max accumulator, then the job is finished in 5 to 10
> minutes. I think it is possible to develop a PlanOptimizer than would
> rewritte the 2nd script to something like this in a generic way :
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> GENERATE filter_when_c_equals_toto_MAX(A.b) AS accumulated;
> }
> the filtered accumulator will be an accumulator itself and wrap any other
> accumulator, forwarding value to be accumulated only if the filtered eval
> function is true.
> This idea can also work on distinct and co, filtered accumulator can wrap
> each other in the layered way.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)