When limit is being added in the terminal of the physical plan there will
be possibility of memory bottleneck
if the limit value is too large and system will try to aggregate all the
partition limit values as part of single partition.
Description:
Eg:
create table src_temp as select * from src limit n;
== Physical Plan ==
ExecutedCommand
   +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2,
InsertIntoHiveTable]
         +- GlobalLimit 2
            +- LocalLimit 2
               +- Project [imei#101, age#102, task#103L, num#104,
level#105, productdate#106, name#107, point#108]
                  +- SubqueryAlias hive
                     +-
Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
csv  |

As shown in above plan when the limit comes in terminal ,there can be two
types of performance bottlenecks.
scenario 1: when the partition count is very high and limit value is small
scenario 2: when the limit value is very large

 protected override def doExecute(): RDD[InternalRow] = {
    val locallyLimited =
child.execute().mapPartitionsInternal(_.take(limit))
    val shuffled = new ShuffledRowRDD(
      ShuffleExchange.prepareShuffleDependency(
        locallyLimited, child.output, SinglePartition, serializer))
    shuffled.mapPartitionsInternal(_.take(limit))
  }
}

As per my understanding the current algorithm first creates the
MapPartitionsRDD by applying limit from each partition, then ShuffledRowRDD
will be created by grouping data from all partitions into single partition,
this can create overhead since all partitions will return limit n data , so
while grouping there will be N partition * limit N which can be very huge,
in both scenarios mentioned above this logic can be a bottle neck.

My suggestion for handling scenario 1 where large number of partition and
limit value is small, in this case driver can create an accumulator value
and try to send to all partitions, all executer will be updating the
accumulator value based on the data fetched ,
eg: number of partition = 100, number of cores =10
tasks will be launched in a group of 10(10*10 = 100), once the first group
finishes the tasks driver will check whether the accumulator value is been
reached the limit value
if its reached then no further task will be launched to executers and the
result will be returned.

Let me know for any furthur suggestions or solution.

Thanks in advance,
Sujith

Reply via email to