This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 49c062b [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark 49c062b is described below commit 49c062b2e0487b13b732b18edde105e1f000c20d Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Wed Jan 9 09:54:21 2019 -0800 [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark ## What changes were proposed in this pull request? Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark to use main method. ## How was this patch tested? Manually tested and regenerated results. Please note that `spark.memory.debugFill` setting has a huge impact on this benchmark. Since it is set to true by default when running the benchmark from SBT, we need to disable it: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions in Test += \"-Dspark.memory.debugFill=false\";test:runMain org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark" ``` Closes #22617 from peter-toth/SPARK-25484. Lead-authored-by: Peter Toth <peter.t...@gmail.com> Co-authored-by: Peter Toth <pt...@hortonworks.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- ...alAppendOnlyUnsafeRowArrayBenchmark-results.txt | 45 ++++++ ...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala | 158 ++++++++------------- 2 files changed, 105 insertions(+), 98 deletions(-) diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt new file mode 100644 index 0000000..02c6b72 --- /dev/null +++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt @@ -0,0 +1,45 @@ +================================================================================================ +WITHOUT SPILL +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +ArrayBuffer 6378 / 6550 16.1 62.3 1.0X +ExternalAppendOnlyUnsafeRowArray 6196 / 6242 16.5 60.5 1.0X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +ArrayBuffer 11988 / 12027 21.9 45.7 1.0X +ExternalAppendOnlyUnsafeRowArray 37480 / 37574 7.0 143.0 0.3X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +ArrayBuffer 23536 / 23538 20.9 47.9 1.0X +ExternalAppendOnlyUnsafeRowArray 31275 / 31277 15.7 63.6 0.8X + + +================================================================================================ +WITH SPILL +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +UnsafeExternalSorter 29241 / 29279 9.0 111.5 1.0X +ExternalAppendOnlyUnsafeRowArray 14309 / 14313 18.3 54.6 2.0X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +UnsafeExternalSorter 11 / 11 14.8 67.4 1.0X +ExternalAppendOnlyUnsafeRowArray 9 / 9 17.6 56.8 1.2X + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 611b2fc..e174dc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -20,24 +20,57 @@ package org.apache.spark.sql.execution import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext} -import org.apache.spark.benchmark.Benchmark +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.internal.config import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter -object ExternalAppendOnlyUnsafeRowArrayBenchmark { +/** + * Benchmark ExternalAppendOnlyUnsafeRowArray. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar> + * 2. build/sbt build/sbt ";project sql;set javaOptions + * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions + * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>" + * Results will be written to + * "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt". + * }}} + */ +object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase { - def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { + private val conf = new SparkConf(false) + // Make the Java serializer write a reset instruction (TC_RESET) after each object to test + // for a bug we had with bytes written past the last object in a batch (SPARK-2792) + .set("spark.serializer.objectStreamReset", "1") + .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + + private def withFakeTaskContext(f: => Unit): Unit = { + val sc = new SparkContext("local", "test", conf) + val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + TaskContext.setTaskContext(taskContext) + f + sc.stop() + } + + private def testRows(numRows: Int): Seq[UnsafeRow] = { val random = new java.util.Random() - val rows = (1 to numRows).map(_ => { + (1 to numRows).map(_ => { val row = new UnsafeRow(1) row.pointTo(new Array[Byte](64), 16) row.setLong(0, random.nextLong()) row }) + } - val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows) + def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { + val rows = testRows(numRows) + + val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows, + output = output) // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an // in-memory buffer of size `numSpillThreshold`. This will mimic that @@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { } } - val conf = new SparkConf(false) - // Make the Java serializer write a reset instruction (TC_RESET) after each object to test - // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "1") - conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") - - val sc = new SparkContext("local", "test", conf) - val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) - TaskContext.setTaskContext(taskContext) - benchmark.run() - sc.stop() + withFakeTaskContext { + benchmark.run() + } } def testAgainstRawUnsafeExternalSorter( numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = { + val rows = testRows(numRows) - val random = new java.util.Random() - val rows = (1 to numRows).map(_ => { - val row = new UnsafeRow(1) - row.pointTo(new Array[Byte](64), 16) - row.setLong(0, random.nextLong()) - row - }) - - val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows) + val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows, + output = output) benchmark.addCase("UnsafeExternalSorter") { _: Int => var sum = 0L @@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark { } } - val conf = new SparkConf(false) - // Make the Java serializer write a reset instruction (TC_RESET) after each object to test - // for a bug we had with bytes written past the last object in a batch (SPARK-2792) - conf.set("spark.serializer.objectStreamReset", "1") - conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") - - val sc = new SparkContext("local", "test", conf) - val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) - TaskContext.setTaskContext(taskContext) - benchmark.run() - sc.stop() + withFakeTaskContext { + benchmark.run() + } } - def main(args: Array[String]): Unit = { - - // ========================================================================================= // - // WITHOUT SPILL - // ========================================================================================= // - - val spillThreshold = 100 * 1000 - - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - ArrayBuffer 7821 / 7941 33.5 29.8 1.0X - ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X - */ - testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) - - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - ArrayBuffer 19200 / 19206 25.6 39.1 1.0X - ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X - */ - testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) - - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - ArrayBuffer 5949 / 6028 17.2 58.1 1.0X - ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X - */ - testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) - - // ========================================================================================= // - // WITH SPILL - // ========================================================================================= // - - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X - ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X - */ - testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) - - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - - Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X - ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X - */ - testAgainstRawUnsafeExternalSorter( - config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("WITHOUT SPILL") { + val spillThreshold = 100 * 1000 + testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10) + testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18) + testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14) + } + + runBenchmark("WITH SPILL") { + testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18) + testAgainstRawUnsafeExternalSorter( + config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org