Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15596#discussion_r85264043
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -27,32 +27,14 @@ import 
org.apache.spark.sql.execution.exchange.ShuffleExchange
     import org.apache.spark.util.Utils
     
     /**
    - * Take the first `limit` elements and collect them to a single partition.
    - *
    - * This operator will be used when a logical `Limit` operation is the 
final operator in an
    - * logical plan, which happens when the user is collecting results back to 
the driver.
    - */
    -case class CollectLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
    -  override def output: Seq[Attribute] = child.output
    -  override def outputPartitioning: Partitioning = SinglePartition
    -  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
    -  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val locallyLimited = 
child.execute().mapPartitionsInternal(_.take(limit))
    -    val shuffled = new ShuffledRowRDD(
    -      ShuffleExchange.prepareShuffleDependency(
    -        locallyLimited, child.output, SinglePartition, serializer))
    -    shuffled.mapPartitionsInternal(_.take(limit))
    -  }
    -}
    -
    -/**
      * Helper trait which defines methods that are shared by both
      * [[LocalLimitExec]] and [[GlobalLimitExec]].
      */
     trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       val limit: Int
       override def output: Seq[Attribute] = child.output
    +  override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
    +  override def executeTake(n: Int): Array[InternalRow] = 
child.executeTake(limit)
     
    --- End diff --
    
    Thanks @pwoody! Agreed. But I am thinking not to replace `CollectLimitExec` 
with `GlobalLimitExec`. The reason is commented below. Let's wait for 
@JoshRosen's response. If we decide to keep `CollectLimitExec`, your change at 
#15614 can be applied then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to