[
https://issues.apache.org/jira/browse/TEZ-1265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14054516#comment-14054516
]
Rohini Palaniswamy commented on TEZ-1265:
-----------------------------------------
Many people do 95th percentile calculation as below
{code}
grouped = group A all;
counted = foreach grouped generate COUNT_STAR(A) ;
sorted = order A by f1 desc;
limited = limit sorted (long)(counted.$0*0.95);
{code}
and the number of records in that case is really high. Some try to use Quantile
UDFs (For eg: DataFu has Quantile and StreamingQuantile UDFs). Whether limit or
the UDF, on huge data it is always time consuming and I have not been able to
suggest a faster alternative to users.
Thinking of the 95th percentile problem and Sidd's suggestion made me think of
trying to do the limiting in the order by phase itself instead of having a
extra limit job as that single limit task will always be slowed down by IO
bottleneck of pulling lot of files and writing out 1 big output file. If each
task in orderby vertex had stats on the number of records produced by other
tasks in partitioner vertex, it can decide to terminate early. For eg: Let's
say limit is 10K. orderby vertex task0 gets multiple part files totaling 5K,
task1 and 2 get 3K and others some number. task0 and task1 will process all,
but task2 will stop after 2K and rest of the tasks don't even pull input and
terminate early. For this to work destination task taskN will have to wait till
all partitioner vertex source tasks from task0 to taskX (where X is the task
where the limit is reached or taskN-1) have completed to determine if it needs
to start pulling and processing though. But don't think that limitation will
make it worse. Can such a thing be done?
> Custom input to fetch source task inputs in order
> -------------------------------------------------
>
> Key: TEZ-1265
> URL: https://issues.apache.org/jira/browse/TEZ-1265
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Rohini Palaniswamy
>
> Consider the case of having to LIMIT m records after an Order by. A
> distributed orderby vertex produces data in sorted order from
> task0,task1...taskn. Each task limits its output to m records (the output
> count can be <m also). The limit vertex (parallelism 1) following the order
> by vertex has to fetch output of all n tasks, shuffle merge its inputs (to
> maintain the order) and then limit m records again. So need a input that
> fetches from source tasks in order and reads them in order. Since data
> produced is ordered from task0,task1...taskn it can be consumed without
> shuffle and sort. If the limit is hit early it can skip fetching more task
> inputs.
> More details in
> https://issues.apache.org/jira/browse/PIG-4049?focusedCommentId=14053217&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14053217
--
This message was sent by Atlassian JIRA
(v6.2#6252)