How about this: 1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new partitioner to uniformly dispatch the data
class LimitUniformPartitioner(partitions: Int) extends Partitioner { def numPartitions: Int = partitions var num = 0 def getPartition(key: Any): Int = { num = num + 1 num % partitions } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } 2. then in GlobalLimit, we only take the first limit_number/num_of_shufflepartitions elements in each partition. One issue left is how to decide shuffle partition number. We can have a config of the maximum number of elements for each GlobalLimit task to process, then do a factorization to get a number most close to that config. E.g. the config is 2000: if limit=10000, 10000 = 2000 * 5, we shuffle to 5 partitions if limit=9999, 9999 = 1111 * 9, we shuffle to 9 partitions if limit is a prime number, we just fall back to single partition best regards, -zhenhua -----邮件原件----- 发件人: Liang-Chi Hsieh [mailto:vii...@gmail.com] 发送时间: 2017年1月18日 15:48 收件人: dev@spark.apache.org 主题: Re: Limit Query Performance Suggestion Hi Sujith, I saw your updated post. Seems it makes sense to me now. If you use a very big limit number, the shuffling before `GlobalLimit` would be a bottleneck for performance, of course, even it can eventually shuffle enough data to the single partition. Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must shuffle all limited data from all partitions to one single machine with respect to query execution. In other words, I think we can avoid shuffling data in `GlobalLimit`. I have an idea to improve this and may update here later if I can make it work. sujith71955 wrote > Dear Liang, > > Thanks for your valuable feedback. > > There was a mistake in the previous post i corrected it, as you > mentioned the `GlobalLimit` we will only take the required number of > rows from the input iterator which really pulls data from local blocks > and remote blocks. > but if the limit value is very high >= 10000000, and when there will > be a shuffle exchange happens between `GlobalLimit` and `LocalLimit` > to retrieve data from all partitions to one partition, since the limit > value is very large the performance bottleneck still exists. > > soon in next post i will publish a test report with sample data and > also figuring out a solution for this problem. > > Please let me know for any clarifications or suggestions regarding > this issue. > > Regards, > Sujith ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org