Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22621#discussion_r222330998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val localIdx = ctx.freshName("localIdx") val localEnd = ctx.freshName("localEnd") val range = ctx.freshName("range") - val shouldStop = if (parent.needStopCheck) { - s"if (shouldStop()) { $number = $value + ${step}L; return; }" + + val processingLoop = if (parent.needStopCheck) { + // TODO (cloud-fan): do we really need to do the stop check within batch? + s""" + |int $localIdx = 0; + |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) { + | long $value = $nextIndex; + | ${consume(ctx, Seq(ev))} + | $nextIndex += ${step}L; + |} + |$numOutput.add($localIdx); + |$inputMetrics.incRecordsRead($localIdx); + """.stripMargin + } else { + s""" + |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { + | long $value = ((long)$localIdx * ${step}L) + $nextIndex; + | ${consume(ctx, Seq(ev))} + |} + |$nextIndex = $batchEnd; + |$numOutput.add($localEnd); + |$inputMetrics.incRecordsRead($localEnd); + """.stripMargin + } + + val loopCondition = if (parent.needStopCheck) { + "!shouldStop()" } else { - "// shouldStop check is eliminated" + "true" } + + // An overview of the Range processing. + // + // For each partition, the Range task needs to produce records from partition start(inclusive) + // to end(exclusive). For better performance, we separate the partition range into batches, and + // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner + // for loop is used to iterate records inside a batch. + // + // `nextIndex` tracks the index of the next record that is going to be consumed, initialized + // with partition start. `batchEnd` tracks the end index of the current batch, initialized + // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero. + // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex` + // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next + // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is + // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner + // loop but just update the `batchEnd`. + // + // The inner loop iterates from 0 to `localEnd`, which is calculated by + // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in + // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always + // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends + // up being equal to `batchEnd` when the inner loop finishes. + // + // The inner loop can be interrupted, if the query has produced at least one result row, so that + // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop, + // because `nextIndex` is updated per loop iteration and remembers how far we have processed. + s""" | // initialize Range | if (!$initTerm) { | $initTerm = true; | $initRangeFuncName(partitionIndex); | } | - | while (true) { - | long $range = $batchEnd - $number; + | while ($loopCondition) { + | long $range = $batchEnd - $nextIndex; | if ($range != 0L) { | int $localEnd = (int)($range / ${step}L); - | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { - | long $value = ((long)$localIdx * ${step}L) + $number; - | ${consume(ctx, Seq(ev))} - | $shouldStop + | $processingLoop + | } else { + | long $nextBatchTodo; --- End diff -- why did you move these lines in the else?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org