Repository: spark Updated Branches: refs/heads/master ca70ab27c -> 4e8ac6edd
[SPARK-15735] Allow specifying min time to run in microbenchmarks ## What changes were proposed in this pull request? This makes microbenchmarks run for at least 2 seconds by default, to allow some time for jit compilation to kick in. ## How was this patch tested? Tested manually with existing microbenchmarks. This change is backwards compatible in that existing microbenchmarks which specified numIters per-case will still run exactly that number of iterations. Microbenchmarks which previously overrode defaultNumIters now override minNumIters. cc hvanhovell Author: Eric Liang <e...@databricks.com> Author: Eric Liang <ekhli...@gmail.com> Closes #13472 from ericl/spark-15735. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e8ac6ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e8ac6ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e8ac6ed Branch: refs/heads/master Commit: 4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36 Parents: ca70ab2 Author: Eric Liang <e...@databricks.com> Authored: Wed Jun 8 16:21:41 2016 -0700 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Wed Jun 8 16:21:41 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Benchmark.scala | 109 ++++++++++++------- 1 file changed, 72 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4e8ac6ed/core/src/main/scala/org/apache/spark/util/Benchmark.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala index 0c685b1..7def44b 100644 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala @@ -17,10 +17,14 @@ package org.apache.spark.util +import java.io.{OutputStream, PrintStream} + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import scala.util.Try +import org.apache.commons.io.output.TeeOutputStream import org.apache.commons.lang3.SystemUtils /** @@ -33,18 +37,37 @@ import org.apache.commons.lang3.SystemUtils * * The benchmark function takes one argument that is the iteration that's being run. * - * If outputPerIteration is true, the timing for each run will be printed to stdout. + * @param name name of this benchmark. + * @param valuesPerIteration number of values used in the test case, used to compute rows/s. + * @param minNumIters the min number of iterations that will be run per case, not counting warm-up. + * @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up. + * @param minTime further iterations will be run for each case until this time is used up. + * @param outputPerIteration if true, the timing for each run will be printed to stdout. + * @param output optional output stream to write benchmark results to */ private[spark] class Benchmark( name: String, valuesPerIteration: Long, - defaultNumIters: Int = 5, - outputPerIteration: Boolean = false) { + minNumIters: Int = 2, + warmupTime: FiniteDuration = 2.seconds, + minTime: FiniteDuration = 2.seconds, + outputPerIteration: Boolean = false, + output: Option[OutputStream] = None) { + import Benchmark._ val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case] + val out = if (output.isDefined) { + new PrintStream(new TeeOutputStream(System.out, output.get)) + } else { + System.out + } + /** * Adds a case to run when run() is called. The given function will be run for several * iterations to collect timing statistics. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run */ def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { addTimerCase(name, numIters) { timer => @@ -58,9 +81,12 @@ private[spark] class Benchmark( * Adds a case with manual timing control. When the function is run, timing does not start * until timer.startTiming() is called within the given function. The corresponding * timer.stopTiming() method must be called before the function returns. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run */ def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = { - benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters else numIters) + benchmarks += Benchmark.Case(name, f, numIters) } /** @@ -75,28 +101,63 @@ private[spark] class Benchmark( val results = benchmarks.map { c => println(" Running case: " + c.name) - Benchmark.measure(valuesPerIteration, c.numIters, outputPerIteration)(c.fn) + measure(valuesPerIteration, c.numIters)(c.fn) } println val firstBest = results.head.bestMs // The results are going to be processor specific so it is useful to include that. - println(Benchmark.getJVMOSInfo()) - println(Benchmark.getProcessorName()) - printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", + out.println(Benchmark.getJVMOSInfo()) + out.println(Benchmark.getProcessorName()) + out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", "Per Row(ns)", "Relative") - println("-" * 96) + out.println("-" * 96) results.zip(benchmarks).foreach { case (result, benchmark) => - printf("%-40s %16s %12s %13s %10s\n", + out.printf("%-40s %16s %12s %13s %10s\n", benchmark.name, "%5.0f / %4.0f" format (result.bestMs, result.avgMs), "%10.1f" format result.bestRate, "%6.1f" format (1000 / result.bestRate), "%3.1fX" format (firstBest / result.bestMs)) } - println + out.println // scalastyle:on } + + /** + * Runs a single function `f` for iters, returning the average time the function took and + * the rate of the function. + */ + def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = { + System.gc() // ensures garbage from previous cases don't impact this one + val warmupDeadline = warmupTime.fromNow + while (!warmupDeadline.isOverdue) { + f(new Benchmark.Timer(-1)) + } + val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters + val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos + val runTimes = ArrayBuffer[Long]() + var i = 0 + while (i < minIters || runTimes.sum < minDuration) { + val timer = new Benchmark.Timer(i) + f(timer) + val runTime = timer.totalTime() + runTimes += runTime + + if (outputPerIteration) { + // scalastyle:off + println(s"Iteration $i took ${runTime / 1000} microseconds") + // scalastyle:on + } + i += 1 + } + // scalastyle:off + println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms") + // scalastyle:on + val best = runTimes.min + val avg = runTimes.sum / runTimes.size + Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0) + } } private[spark] object Benchmark { @@ -161,30 +222,4 @@ private[spark] object Benchmark { val osVersion = System.getProperty("os.version") s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}" } - - /** - * Runs a single function `f` for iters, returning the average time the function took and - * the rate of the function. - */ - def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Timer => Unit): Result = { - val runTimes = ArrayBuffer[Long]() - for (i <- 0 until iters + 1) { - val timer = new Benchmark.Timer(i) - f(timer) - val runTime = timer.totalTime() - if (i > 0) { - runTimes += runTime - } - - if (outputPerIteration) { - // scalastyle:off - println(s"Iteration $i took ${runTime / 1000} microseconds") - // scalastyle:on - } - } - val best = runTimes.min - val avg = runTimes.sum / iters - Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0) - } } - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org