Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b2778c8bb -> 2c39b9a5d


[SPARK-15735] Allow specifying min time to run in microbenchmarks

## What changes were proposed in this pull request?

This makes microbenchmarks run for at least 2 seconds by default, to allow some 
time for jit compilation to kick in.

## How was this patch tested?

Tested manually with existing microbenchmarks. This change is backwards 
compatible in that existing microbenchmarks which specified numIters per-case 
will still run exactly that number of iterations. Microbenchmarks which 
previously overrode defaultNumIters now override minNumIters.

cc hvanhovell

Author: Eric Liang <e...@databricks.com>
Author: Eric Liang <ekhli...@gmail.com>

Closes #13472 from ericl/spark-15735.

(cherry picked from commit 4e8ac6edd5808ca8245b39d804c6d4f5ea9d0d36)
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c39b9a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c39b9a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c39b9a5

Branch: refs/heads/branch-2.0
Commit: 2c39b9a5d84219dbd315a93bc6d529dc72fa0a88
Parents: b2778c8
Author: Eric Liang <e...@databricks.com>
Authored: Wed Jun 8 16:21:41 2016 -0700
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Wed Jun 8 16:22:05 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Benchmark.scala | 109 ++++++++++++-------
 1 file changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c39b9a5/core/src/main/scala/org/apache/spark/util/Benchmark.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala 
b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 0c685b1..7def44b 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -17,10 +17,14 @@
 
 package org.apache.spark.util
 
+import java.io.{OutputStream, PrintStream}
+
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
 import scala.util.Try
 
+import org.apache.commons.io.output.TeeOutputStream
 import org.apache.commons.lang3.SystemUtils
 
 /**
@@ -33,18 +37,37 @@ import org.apache.commons.lang3.SystemUtils
  *
  * The benchmark function takes one argument that is the iteration that's 
being run.
  *
- * If outputPerIteration is true, the timing for each run will be printed to 
stdout.
+ * @param name name of this benchmark.
+ * @param valuesPerIteration number of values used in the test case, used to 
compute rows/s.
+ * @param minNumIters the min number of iterations that will be run per case, 
not counting warm-up.
+ * @param warmupTime amount of time to spend running dummy case iterations for 
JIT warm-up.
+ * @param minTime further iterations will be run for each case until this time 
is used up.
+ * @param outputPerIteration if true, the timing for each run will be printed 
to stdout.
+ * @param output optional output stream to write benchmark results to
  */
 private[spark] class Benchmark(
     name: String,
     valuesPerIteration: Long,
-    defaultNumIters: Int = 5,
-    outputPerIteration: Boolean = false) {
+    minNumIters: Int = 2,
+    warmupTime: FiniteDuration = 2.seconds,
+    minTime: FiniteDuration = 2.seconds,
+    outputPerIteration: Boolean = false,
+    output: Option[OutputStream] = None) {
+  import Benchmark._
   val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]
 
+  val out = if (output.isDefined) {
+    new PrintStream(new TeeOutputStream(System.out, output.get))
+  } else {
+    System.out
+  }
+
   /**
    * Adds a case to run when run() is called. The given function will be run 
for several
    * iterations to collect timing statistics.
+   *
+   * @param name of the benchmark case
+   * @param numIters if non-zero, forces exactly this many iterations to be run
    */
   def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
     addTimerCase(name, numIters) { timer =>
@@ -58,9 +81,12 @@ private[spark] class Benchmark(
    * Adds a case with manual timing control. When the function is run, timing 
does not start
    * until timer.startTiming() is called within the given function. The 
corresponding
    * timer.stopTiming() method must be called before the function returns.
+   *
+   * @param name of the benchmark case
+   * @param numIters if non-zero, forces exactly this many iterations to be run
    */
   def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => 
Unit): Unit = {
-    benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters 
else numIters)
+    benchmarks += Benchmark.Case(name, f, numIters)
   }
 
   /**
@@ -75,28 +101,63 @@ private[spark] class Benchmark(
 
     val results = benchmarks.map { c =>
       println("  Running case: " + c.name)
-      Benchmark.measure(valuesPerIteration, c.numIters, 
outputPerIteration)(c.fn)
+      measure(valuesPerIteration, c.numIters)(c.fn)
     }
     println
 
     val firstBest = results.head.bestMs
     // The results are going to be processor specific so it is useful to 
include that.
-    println(Benchmark.getJVMOSInfo())
-    println(Benchmark.getProcessorName())
-    printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", 
"Rate(M/s)",
+    out.println(Benchmark.getJVMOSInfo())
+    out.println(Benchmark.getProcessorName())
+    out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", 
"Rate(M/s)",
       "Per Row(ns)", "Relative")
-    println("-" * 96)
+    out.println("-" * 96)
     results.zip(benchmarks).foreach { case (result, benchmark) =>
-      printf("%-40s %16s %12s %13s %10s\n",
+      out.printf("%-40s %16s %12s %13s %10s\n",
         benchmark.name,
         "%5.0f / %4.0f" format (result.bestMs, result.avgMs),
         "%10.1f" format result.bestRate,
         "%6.1f" format (1000 / result.bestRate),
         "%3.1fX" format (firstBest / result.bestMs))
     }
-    println
+    out.println
     // scalastyle:on
   }
+
+  /**
+   * Runs a single function `f` for iters, returning the average time the 
function took and
+   * the rate of the function.
+   */
+  def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = {
+    System.gc()  // ensures garbage from previous cases don't impact this one
+    val warmupDeadline = warmupTime.fromNow
+    while (!warmupDeadline.isOverdue) {
+      f(new Benchmark.Timer(-1))
+    }
+    val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
+    val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
+    val runTimes = ArrayBuffer[Long]()
+    var i = 0
+    while (i < minIters || runTimes.sum < minDuration) {
+      val timer = new Benchmark.Timer(i)
+      f(timer)
+      val runTime = timer.totalTime()
+      runTimes += runTime
+
+      if (outputPerIteration) {
+        // scalastyle:off
+        println(s"Iteration $i took ${runTime / 1000} microseconds")
+        // scalastyle:on
+      }
+      i += 1
+    }
+    // scalastyle:off
+    println(s"  Stopped after $i iterations, ${runTimes.sum / 1000000} ms")
+    // scalastyle:on
+    val best = runTimes.min
+    val avg = runTimes.sum / runTimes.size
+    Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
+  }
 }
 
 private[spark] object Benchmark {
@@ -161,30 +222,4 @@ private[spark] object Benchmark {
     val osVersion = System.getProperty("os.version")
     s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}"
   }
-
-  /**
-   * Runs a single function `f` for iters, returning the average time the 
function took and
-   * the rate of the function.
-   */
-  def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Timer => 
Unit): Result = {
-    val runTimes = ArrayBuffer[Long]()
-    for (i <- 0 until iters + 1) {
-      val timer = new Benchmark.Timer(i)
-      f(timer)
-      val runTime = timer.totalTime()
-      if (i > 0) {
-        runTimes += runTime
-      }
-
-      if (outputPerIteration) {
-        // scalastyle:off
-        println(s"Iteration $i took ${runTime / 1000} microseconds")
-        // scalastyle:on
-      }
-    }
-    val best = runTimes.min
-    val avg = runTimes.sum / iters
-    Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
-  }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to