[ 
https://issues.apache.org/jira/browse/SPARK-19222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823005#comment-15823005
 ] 

Takeshi Yamamuro commented on SPARK-19222:
------------------------------------------

In those cases, "sample + select" is not enough?
IMO, in most cases, "limit" is used along with "sort" for operations like top-k 
searching.
In this "sort + limit" case, the approach you described does not work, I think.

> Limit Query Performance issue
> -----------------------------
>
>                 Key: SPARK-19222
>                 URL: https://issues.apache.org/jira/browse/SPARK-19222
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>         Environment: Linux/Windows
>            Reporter: Sujith
>            Priority: Minor
>
> Performance/memory bottle neck occurs in the below mentioned query
> case 1:
> create table t1 as select * from dest1 limit 10000000;
> case 2:
> create table t1 as select * from dest1 limit 1000;
> pre-condition : partition count >=10000
> In above cases limit is being added in the terminal of the physical plan 
> == Physical Plan  ==
> ExecutedCommand
>    +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, 
> InsertIntoHiveTable]
>          +- GlobalLimit 10000000
>             +- LocalLimit 10000000
>                +- Project [imei#101, age#102, task#103L, num#104, level#105, 
> productdate#106, name#107, point#108]
>                   +- SubqueryAlias hive
>                      +- 
> Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
>  csv  |
> Issue Hints: 
> Possible Bottleneck snippet in limit.scala file under spark-sql package.
>   protected override def doExecute(): RDD[InternalRow] = {
>     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
>     val shuffled = new ShuffledRowRDD(
>       ShuffleExchange.prepareShuffleDependency(
>         locallyLimited, child.output, SinglePartition, serializer))
>     shuffled.mapPartitionsInternal(_.take(limit))
>   }
> As mentioned in above case 1  (where limit value is 10000000 or partition 
> count is > 10000) and case 2(limit value is small(around 1000)), As per the 
> above snippet when the ShuffledRowRDD
> is created by grouping all the limit data from different partitions to a 
> single partition in executer,  memory issue occurs since all the partition 
> limit data will be collected and 
> grouped  in a single partition for processing, in both former/later case the 
> data count  can go very high which can create the memory bottleneck.
> Proposed solution for case 2:
> An accumulator value can be to send to all partitions, all executor will be 
> updating the accumulator value based on the  data fetched , 
> eg: Number of partition = 100, number of cores =10
> Ideally tasks will be launched in a group of 10 task/core, once the first 
> group finishes the tasks driver will check whether the accumulator value is 
> been reached the limit value if its reached then no further tasks will be 
> launched to executors and the result after applying limit will be returned.
> Please let me now for any suggestions or solutions for the above mentioned 
> problems
> Thanks,
> Sujith



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to