Github user hvanhovell commented on a diff in the pull request:
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
    - * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
    + * Take the `limit` elements of the child output.
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
       override def outputPartitioning: Partitioning = child.outputPartitioning
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    -  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
    +  private val serializer: Serializer = new 
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = 
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across 
    +    // partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick 
up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
    --- End diff --
    `select * from table order by a limit 10` gets planned differently right? 
It should use `TakeOrderedAndProjectExec`.
    There is nothing in the SQL standard that mandates that a nested order by 
followed by a limit has to respect that ordering clause. In fact, AFAIR, the 
standard does not even support nested limits (they make stuff 
    If we end up supporting this, then I'd rather have an explicit flag in 
`GlobalLimitExec` (`orderedLimit` or something like that) and set that during 
planning by matching on `Limit(limit, Sort(order, true, child))`. I want the 
explicit flag because then we can figure out what limit is doing by looking at 
the physical plan. I want to explicitly check for an underlying sort to match 
the current `TakeOrderedAndProjectExec` semantics and to avoid weird behavior 
because something way down the plan has set some arbitrary ordering.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to