Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16677#discussion_r212830045
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
     }
     
     /**
    - * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
    + * Take the `limit` elements of the child output.
      */
    -case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
    +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +  override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    -}
     
    -/**
    - * Take the first `limit` elements of the child's single output partition.
    - */
    -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
    +  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
     
    -  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
    +  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
     
    -  override def outputPartitioning: Partitioning = child.outputPartitioning
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val childRDD = child.execute()
    +    val partitioner = LocalPartitioning(childRDD)
    +    val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
    +      childRDD, child.output, partitioner, serializer)
    +    val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
    +      // submitMapStage does not accept RDD with 0 partition.
    +      // So, we will not submit this dependency.
    +      val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
    +      submittedStageFuture.get().recordsByPartitionId.toSeq
    +    } else {
    +      Nil
    +    }
     
    -  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
    +    // During global limit, try to evenly distribute limited rows across 
data
    +    // partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
    +    // Besides, if child output has certain ordering, we can't evenly pick 
up rows from
    +    // each parititon.
    +    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
    --- End diff --
    
    `select * from table order by a limit 10` gets planned differently right? 
It should use `TakeOrderedAndProjectExec`.
    
    There is nothing in the SQL standard that mandates that a nested order by 
followed by a limit has to respect that ordering clause. In fact, AFAIR, the 
standard does not even support nested limits (they make stuff 
non-deterministic).
    
    If we end up supporting this, then I'd rather have an explicit flag in 
`GlobalLimitExec` (`orderedLimit` or something like that) and set that during 
planning by matching on `Limit(limit, Sort(order, true, child))`. I want the 
explicit flag because then we can figure out what limit is doing by looking at 
the physical plan. I want to explicitly check for an underlying sort to match 
the current `TakeOrderedAndProjectExec` semantics and to avoid weird behavior 
because something way down the plan has set some arbitrary ordering.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to