When limit is being added in the terminal of the physical plan there will be possibility of memory bottleneck if the limit value is too large and system will try to aggregate all the partition limit values as part of single partition. Description: Eg: create table src_temp as select * from src limit n; == Physical Plan == ExecutedCommand +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable] +- GlobalLimit 2 +- LocalLimit 2 +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106, name#107, point#108] +- SubqueryAlias hive +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] csv |
As shown in above plan when the limit comes in terminal ,there can be two types of performance bottlenecks. scenario 1: when the partition count is very high and limit value is small scenario 2: when the limit value is very large protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( locallyLimited, child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } } As per my understanding the current algorithm first creates the MapPartitionsRDD by applying limit from each partition, then ShuffledRowRDD will be created by grouping data from all partitions into single partition, this can create overhead since all partitions will return limit n data , so while grouping there will be N partition * limit N which can be very huge, in both scenarios mentioned above this logic can be a bottle neck. My suggestion for handling scenario 1 where large number of partition and limit value is small, in this case driver can create an accumulator value and try to send to all partitions, all executer will be updating the accumulator value based on the data fetched , eg: number of partition = 100, number of cores =10 tasks will be launched in a group of 10(10*10 = 100), once the first group finishes the tasks driver will check whether the accumulator value is been reached the limit value if its reached then no further task will be launched to executers and the result will be returned. Let me know for any furthur suggestions or solution. Thanks in advance, Sujith