Hi Sujith, Thanks for suggestion.
The codes you quoted are from `CollectLimitExec` which will be in the plan if a logical `Limit` is the final operator in an logical plan. But in the physical plan you showed, there are `GlobalLimit` and `LocalLimit` for the logical `Limit` operation, so the `doExecute` method of `CollectLimitExec` will not be executed. In the case `CollectLimitExec` is the final physical operation, its `executeCollect` will be executed and delegate to `SparkPlan.executeTake` which is optimized to only retrieved required number of rows back to the driver. So when using `limit n` with a huge partition number it should not be a problem. In the case `GlobalLimit` and `LocalLimit` are the final physical operations, your concern is that when returning `n` rows from `N` partitions and `N` is huge, the total `n * N` rows will cause heavy memory pressure on the driver. I am not sure if you really observe this problem or you just think it might be a problem. In this case, there will be a shuffle exchange between `GlobalLimit` and `LocalLimit` to retrieve data from all partitions to one partition. In `GlobalLimit` we will only take the required number of rows from the input iterator which really pulls data from local blocks and remote blocks. Due to the use of iterator approach, I think when we get the enough rows in `GlobalLimit`, we won't continue to consume the input iterator and pull more data back. So I don't think your concern will be a problem. sujith71955 wrote > When limit is being added in the terminal of the physical plan there will > be possibility of memory bottleneck > if the limit value is too large and system will try to aggregate all the > partition limit values as part of single partition. > Description: > Eg: > create table src_temp as select * from src limit n; > == Physical Plan == > ExecutedCommand > +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, > InsertIntoHiveTable] > +- GlobalLimit 2 > +- LocalLimit 2 > +- Project [imei#101, age#102, task#103L, num#104, > level#105, productdate#106, name#107, point#108] > +- SubqueryAlias hive > +- > Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] > csv | > > As shown in above plan when the limit comes in terminal ,there can be two > types of performance bottlenecks. > scenario 1: when the partition count is very high and limit value is small > scenario 2: when the limit value is very large > > protected override def doExecute(): RDD[InternalRow] = { > val locallyLimited = > child.execute().mapPartitionsInternal(_.take(limit)) > val shuffled = new ShuffledRowRDD( > ShuffleExchange.prepareShuffleDependency( > locallyLimited, child.output, SinglePartition, serializer)) > shuffled.mapPartitionsInternal(_.take(limit)) > } > } > > As per my understanding the current algorithm first creates the > MapPartitionsRDD by applying limit from each partition, then > ShuffledRowRDD > will be created by grouping data from all partitions into single > partition, > this can create overhead since all partitions will return limit n data , > so > while grouping there will be N partition * limit N which can be very huge, > in both scenarios mentioned above this logic can be a bottle neck. > > My suggestion for handling scenario 1 where large number of partition and > limit value is small, in this case driver can create an accumulator value > and try to send to all partitions, all executer will be updating the > accumulator value based on the data fetched , > eg: number of partition = 100, number of cores =10 > tasks will be launched in a group of 10(10*10 = 100), once the first group > finishes the tasks driver will check whether the accumulator value is been > reached the limit value > if its reached then no further task will be launched to executers and the > result will be returned. > > Let me know for any furthur suggestions or solution. > > Thanks in advance, > Sujith ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20607.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org