Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14573#discussion_r76227554
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
         val buf = new ArrayBuffer[InternalRow]
         val totalParts = childRDD.partitions.length
         var partsScanned = 0
    +
         while (buf.size < n && partsScanned < totalParts) {
           // The number of partitions to try in this iteration. It is ok for 
this number to be
           // greater than totalParts because we actually cap it at totalParts 
in runJob.
           var numPartsToTry = 1L
           if (partsScanned > 0) {
    -        // If we didn't find any rows after the first iteration, just try 
all partitions next.
    -        // Otherwise, interpolate the number of partitions we need to try, 
but overestimate it
    -        // by 50%.
    -        if (buf.size == 0) {
    -          numPartsToTry = totalParts - 1
    +        // If we didn't find any rows after the previous iteration, 
quadruple and retry.
    +        // Otherwise, interpolate the number of partitions we need to try, 
but overestimate
    +        // it by 50%. We also cap the estimation in the end.
    +        val takeRampUpRate = sqlContext.conf.takeRampUpRate
    +        if (buf.isEmpty) {
    +          numPartsToTry = partsScanned * takeRampUpRate
             } else {
    -          numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt
    +          // the left side of max is >=1 whenever partsScanned >= 2
    +          numPartsToTry = Math.max((1.5 * n * partsScanned / 
buf.size).toInt - partsScanned, 1)
    +          numPartsToTry = Math.min(numPartsToTry, partsScanned * 
takeRampUpRate)
             }
           }
    -      numPartsToTry = math.max(0, numPartsToTry)  // guard against 
negative num of partitions
     
           val left = n - buf.size
           val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
           val sc = sqlContext.sparkContext
    -      val res = sc.runJob(childRDD,
    -        (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else 
Array.empty, p)
    +      val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => 
it.take(left).toArray, p)
    --- End diff --
    
    Why change this? The childRDD is an `RDD[Array[Byte]]` it returns an 
iterator with exactly one element (a byte array containing the entire array).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to