Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222726409
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
         } else {
           "// shouldStop check is eliminated"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from 
partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the 
partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to 
iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be 
consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the 
current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `nextIndex 
== batchEnd`. If it's true,
    +    // it means the current batch is fully consumed, and we will update 
`batchEnd` to process the
    +    // next batch. If `batchEnd` reaches partition end, exit the outer 
loop. finally we enter the
    +    // inner loop. Note that, when we enter inner loop, `nextIndex` must 
be different from
    +    // `batchEnd`, otherwise the outer loop should already exits.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by 
`nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - 
nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during 
each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at 
least one result row, so that
    +    // we don't buffer too many result rows and waste memory. It's ok to 
interrupt the inner loop,
    +    // because `nextIndex` will be updated before interrupting.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    -      |   if ($range != 0L) {
    -      |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      | while (true$keepProducingDataCond) {
    +      |   if ($nextIndex == $batchEnd) {
    +      |     long $nextBatchTodo;
    +      |     if ($numElementsTodo > ${batchSize}L) {
    +      |       $nextBatchTodo = ${batchSize}L;
    +      |       $numElementsTodo -= ${batchSize}L;
    +      |     } else {
    +      |       $nextBatchTodo = $numElementsTodo;
    +      |       $numElementsTodo = 0;
    +      |       if ($nextBatchTodo == 0) break;
           |     }
    -      |     $number = $batchEnd;
    +      |     $numOutput.add($nextBatchTodo);
    +      |     $inputMetrics.incRecordsRead($nextBatchTodo);
    +      |     $batchEnd += $nextBatchTodo * ${step}L;
           |   }
           |
    -      |   $taskContext.killTaskIfInterrupted();
    -      |
    -      |   long $nextBatchTodo;
    -      |   if ($numElementsTodo > ${batchSize}L) {
    -      |     $nextBatchTodo = ${batchSize}L;
    -      |     $numElementsTodo -= ${batchSize}L;
    -      |   } else {
    -      |     $nextBatchTodo = $numElementsTodo;
    -      |     $numElementsTodo = 0;
    -      |     if ($nextBatchTodo == 0) break;
    +      |   int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L);
    --- End diff --
    
    The change here simply moves the inner loop after the `batchEnd` and 
metrics update, so that we can get correct metrics when we can stop earlier 
because of limit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to