Repository: spark Updated Branches: refs/heads/master a129f0795 -> 94de5609b
[SPARK-25848][SQL][TEST] Refactor CSVBenchmarks to use main method ## What changes were proposed in this pull request? use spark-submit: `bin/spark-submit --class org.apache.spark.sql.execution.datasources.csv.CSVBenchmark --jars ./core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar,./sql/catalyst/target/spark-catalyst_2.11-3.0.0-SNAPSHOT-tests.jar ./sql/core/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jar` Generate benchmark result: `SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.datasources.csv.CSVBenchmark"` ## How was this patch tested? manual tests Closes #22845 from heary-cao/CSVBenchmarks. Authored-by: caoxuewen <cao.xue...@zte.com.cn> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94de5609 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94de5609 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94de5609 Branch: refs/heads/master Commit: 94de5609be27e2618d6d241ec9aa032fbc601b6e Parents: a129f07 Author: caoxuewen <cao.xue...@zte.com.cn> Authored: Tue Oct 30 09:18:55 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Tue Oct 30 09:18:55 2018 -0700 ---------------------------------------------------------------------- sql/core/benchmarks/CSVBenchmark-results.txt | 27 ++++ .../datasources/csv/CSVBenchmark.scala | 136 ++++++++++++++++ .../datasources/csv/CSVBenchmarks.scala | 158 ------------------- 3 files changed, 163 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/94de5609/sql/core/benchmarks/CSVBenchmark-results.txt ---------------------------------------------------------------------- diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt b/sql/core/benchmarks/CSVBenchmark-results.txt new file mode 100644 index 0000000..865575b --- /dev/null +++ b/sql/core/benchmarks/CSVBenchmark-results.txt @@ -0,0 +1,27 @@ +================================================================================================ +Benchmark to measure CSV read/write performance +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +One quoted string 64733 / 64839 0.0 1294653.1 1.0X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Select 1000 columns 185609 / 189735 0.0 185608.6 1.0X +Select 100 columns 50195 / 51808 0.0 50194.8 3.7X +Select one column 39266 / 39293 0.0 39265.6 4.7X +count() 10959 / 11000 0.1 10958.5 16.9X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Select 10 columns + count() 24637 / 24768 0.4 2463.7 1.0X +Select 1 column + count() 20026 / 20076 0.5 2002.6 1.2X +count() 3754 / 3877 2.7 375.4 6.6X + http://git-wip-us.apache.org/repos/asf/spark/blob/94de5609/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala new file mode 100644 index 0000000..ce38b08 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.csv + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types._ + +/** + * Benchmark to measure CSV read/write performance. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar>, + * <spark catalyst test jar> <spark sql test jar> + * 2. build/sbt "sql/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" + * Results will be written to "benchmarks/CSVBenchmark-results.txt". + * }}} + */ + +object CSVBenchmark extends SqlBasedBenchmark { + import spark.implicits._ + + def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = output) + + withTempPath { path => + val str = (0 until 10000).map(i => s""""$i"""").mkString(",") + + spark.range(rowsNum) + .map(_ => str) + .write.option("header", true) + .csv(path.getAbsolutePath) + + val schema = new StructType().add("value", StringType) + val ds = spark.read.option("header", true).schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"One quoted string", numIters) { _ => + ds.filter((_: Row) => true).count() + } + + benchmark.run() + } + } + + def multiColumnsBenchmark(rowsNum: Int): Unit = { + val colsNum = 1000 + val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum, output = output) + + withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + val values = (0 until colsNum).map(i => i.toString).mkString(",") + val columnNames = schema.fieldNames + + spark.range(rowsNum) + .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) + .write.option("header", true) + .csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns", 3) { _ => + ds.select("*").filter((row: Row) => true).count() + } + val cols100 = columnNames.take(100).map(Column(_)) + benchmark.addCase(s"Select 100 columns", 3) { _ => + ds.select(cols100: _*).filter((row: Row) => true).count() + } + benchmark.addCase(s"Select one column", 3) { _ => + ds.select($"col1").filter((row: Row) => true).count() + } + benchmark.addCase(s"count()", 3) { _ => + ds.count() + } + + benchmark.run() + } + } + + def countBenchmark(rowsNum: Int): Unit = { + val colsNum = 10 + val benchmark = + new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output) + + withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + + spark.range(rowsNum) + .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) + .write + .csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => + ds.select("*").filter((_: Row) => true).count() + } + benchmark.addCase(s"Select 1 column + count()", 3) { _ => + ds.select($"col1").filter((_: Row) => true).count() + } + benchmark.addCase(s"count()", 3) { _ => + ds.count() + } + + benchmark.run() + } + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Benchmark to measure CSV read/write performance") { + quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) + multiColumnsBenchmark(rowsNum = 1000 * 1000) + countBenchmark(10 * 1000 * 1000) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/94de5609/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala deleted file mode 100644 index 5d1a874..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.csv - -import org.apache.spark.SparkConf -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.{Column, Row, SparkSession} -import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.functions.lit -import org.apache.spark.sql.types._ - -/** - * Benchmark to measure CSV read/write performance. - * To run this: - * spark-submit --class <this class> --jars <spark sql test jar> - */ -object CSVBenchmarks extends SQLHelper { - val conf = new SparkConf() - - val spark = SparkSession.builder - .master("local[1]") - .appName("benchmark-csv-datasource") - .config(conf) - .getOrCreate() - import spark.implicits._ - - def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = { - val benchmark = new Benchmark(s"Parsing quoted values", rowsNum) - - withTempPath { path => - val str = (0 until 10000).map(i => s""""$i"""").mkString(",") - - spark.range(rowsNum) - .map(_ => str) - .write.option("header", true) - .csv(path.getAbsolutePath) - - val schema = new StructType().add("value", StringType) - val ds = spark.read.option("header", true).schema(schema).csv(path.getAbsolutePath) - - benchmark.addCase(s"One quoted string", numIters) { _ => - ds.filter((_: Row) => true).count() - } - - /* - Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz - - Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - -------------------------------------------------------------------------------------------- - One quoted string 30273 / 30549 0.0 605451.2 1.0X - */ - benchmark.run() - } - } - - def multiColumnsBenchmark(rowsNum: Int): Unit = { - val colsNum = 1000 - val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum) - - withTempPath { path => - val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) - val schema = StructType(fields) - val values = (0 until colsNum).map(i => i.toString).mkString(",") - val columnNames = schema.fieldNames - - spark.range(rowsNum) - .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) - .write.option("header", true) - .csv(path.getAbsolutePath) - - val ds = spark.read.schema(schema).csv(path.getAbsolutePath) - - benchmark.addCase(s"Select $colsNum columns", 3) { _ => - ds.select("*").filter((row: Row) => true).count() - } - val cols100 = columnNames.take(100).map(Column(_)) - benchmark.addCase(s"Select 100 columns", 3) { _ => - ds.select(cols100: _*).filter((row: Row) => true).count() - } - benchmark.addCase(s"Select one column", 3) { _ => - ds.select($"col1").filter((row: Row) => true).count() - } - benchmark.addCase(s"count()", 3) { _ => - ds.count() - } - - /* - Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz - - Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - -------------------------------------------------------------------------------------------- - Select 1000 columns 81091 / 81692 0.0 81090.7 1.0X - Select 100 columns 30003 / 34448 0.0 30003.0 2.7X - Select one column 24792 / 24855 0.0 24792.0 3.3X - count() 24344 / 24642 0.0 24343.8 3.3X - */ - benchmark.run() - } - } - - def countBenchmark(rowsNum: Int): Unit = { - val colsNum = 10 - val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum) - - withTempPath { path => - val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) - val schema = StructType(fields) - - spark.range(rowsNum) - .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) - .write - .csv(path.getAbsolutePath) - - val ds = spark.read.schema(schema).csv(path.getAbsolutePath) - - benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => - ds.select("*").filter((_: Row) => true).count() - } - benchmark.addCase(s"Select 1 column + count()", 3) { _ => - ds.select($"col1").filter((_: Row) => true).count() - } - benchmark.addCase(s"count()", 3) { _ => - ds.count() - } - - /* - Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz - - Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - --------------------------------------------------------------------------------------------- - Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X - Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X - count() 2332 / 2386 4.3 233.2 5.4X - */ - benchmark.run() - } - } - - def main(args: Array[String]): Unit = { - quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) - multiColumnsBenchmark(rowsNum = 1000 * 1000) - countBenchmark(10 * 1000 * 1000) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org