This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 55f26d8 [SPARK-27533][SQL][TEST] Date and timestamp CSV benchmarks 55f26d8 is described below commit 55f26d809008d26e9727874128aee0a61dcfea00 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Tue Apr 23 11:08:02 2019 +0900 [SPARK-27533][SQL][TEST] Date and timestamp CSV benchmarks ## What changes were proposed in this pull request? Added new CSV benchmarks related to date and timestamps operations: - Write date/timestamp to CSV files - `to_csv()` and `from_csv()` for dates and timestamps - Read date/timestamps from CSV files, and infer schemas - Parse and infer schemas from `Dataset[String]` Also existing CSV benchmarks are ported on `NoOp` datasource. Closes #24429 from MaxGekk/csv-timestamp-benchmark. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- sql/core/benchmarks/CSVBenchmark-results.txt | 73 +++++--- .../execution/datasources/csv/CSVBenchmark.scala | 201 ++++++++++++++++++--- 2 files changed, 229 insertions(+), 45 deletions(-) diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt b/sql/core/benchmarks/CSVBenchmark-results.txt index 4fef15b..888c2ce 100644 --- a/sql/core/benchmarks/CSVBenchmark-results.txt +++ b/sql/core/benchmarks/CSVBenchmark-results.txt @@ -2,29 +2,58 @@ Benchmark to measure CSV read/write performance ================================================================================================ -Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic -Intel(R) Xeon(R) CPU @ 2.50GHz -Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------- -One quoted string 49754 / 50158 0.0 995072.2 1.0X +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 +Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz +Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +One quoted string 36998 37134 120 0.0 739953.1 1.0X -Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic -Intel(R) Xeon(R) CPU @ 2.50GHz -Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------- -Select 1000 columns 149402 / 151785 0.0 149401.9 1.0X -Select 100 columns 42986 / 43985 0.0 42986.1 3.5X -Select one column 33764 / 34057 0.0 33763.6 4.4X -count() 9332 / 9508 0.1 9332.2 16.0X -Select 100 columns, one bad input field 50963 / 51512 0.0 50962.5 2.9X -Select 100 columns, corrupt record field 69627 / 71029 0.0 69627.5 2.1X +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 +Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz +Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Select 1000 columns 140620 141162 737 0.0 140620.5 1.0X +Select 100 columns 35170 35287 183 0.0 35170.0 4.0X +Select one column 27711 27927 187 0.0 27710.9 5.1X +count() 7707 7804 84 0.1 7707.4 18.2X +Select 100 columns, one bad input field 41762 41851 117 0.0 41761.8 3.4X +Select 100 columns, corrupt record field 48717 48761 44 0.0 48717.4 2.9X -Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic -Intel(R) Xeon(R) CPU @ 2.50GHz -Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------- -Select 10 columns + count() 22588 / 22623 0.4 2258.8 1.0X -Select 1 column + count() 14649 / 14690 0.7 1464.9 1.5X -count() 3385 / 3453 3.0 338.5 6.7X +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 +Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz +Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Select 10 columns + count() 16001 16053 53 0.6 1600.1 1.0X +Select 1 column + count() 11571 11614 58 0.9 1157.1 1.4X +count() 4752 4766 18 2.1 475.2 3.4X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 +Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz +Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Create a dataset of timestamps 1070 1072 2 9.3 107.0 1.0X +to_csv(timestamp) 10446 10746 344 1.0 1044.6 0.1X +write timestamps to files 9573 9659 101 1.0 957.3 0.1X +Create a dataset of dates 1245 1260 17 8.0 124.5 0.9X +to_csv(date) 7157 7167 11 1.4 715.7 0.1X +write dates to files 5415 5450 57 1.8 541.5 0.2X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 +Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz +Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +read timestamp text from files 1880 1887 8 5.3 188.0 1.0X +read timestamps from files 27135 27180 43 0.4 2713.5 0.1X +infer timestamps from files 51426 51534 97 0.2 5142.6 0.0X +read date text from files 1618 1622 4 6.2 161.8 1.2X +read date from files 20207 20218 13 0.5 2020.7 0.1X +infer date from files 19418 19479 94 0.5 1941.8 0.1X +timestamp strings 2289 2300 13 4.4 228.9 0.8X +parse timestamps from Dataset[String] 29367 29391 24 0.3 2936.7 0.1X +infer timestamps from Dataset[String] 54782 54902 126 0.2 5478.2 0.0X +date strings 2508 2524 16 4.0 250.8 0.7X +parse dates from Dataset[String] 21884 21902 19 0.5 2188.4 0.1X +from_csv(timestamp) 27188 27723 477 0.4 2718.8 0.1X +from_csv(date) 21137 21191 84 0.5 2113.7 0.1X diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala index 6e6fc47..e41e81a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala @@ -16,10 +16,13 @@ */ package org.apache.spark.sql.execution.datasources.csv +import java.io.File +import java.time.{Instant, LocalDate} + import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.{Column, Dataset, Row} import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ /** @@ -39,7 +42,9 @@ import org.apache.spark.sql.types._ object CSVBenchmark extends SqlBasedBenchmark { import spark.implicits._ - def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = { + private def toNoop(ds: Dataset[_]): Unit = ds.write.format("noop").save() + + private def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = { val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = output) withTempPath { path => @@ -54,14 +59,14 @@ object CSVBenchmark extends SqlBasedBenchmark { val ds = spark.read.option("header", true).schema(schema).csv(path.getAbsolutePath) benchmark.addCase(s"One quoted string", numIters) { _ => - ds.filter((_: Row) => true).count() + toNoop(ds) } benchmark.run() } } - def multiColumnsBenchmark(rowsNum: Int): Unit = { + private def multiColumnsBenchmark(rowsNum: Int, numIters: Int): Unit = { val colsNum = 1000 val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum, output = output) @@ -78,25 +83,25 @@ object CSVBenchmark extends SqlBasedBenchmark { val ds = spark.read.schema(schema).csv(path.getAbsolutePath) - benchmark.addCase(s"Select $colsNum columns", 3) { _ => - ds.select("*").filter((row: Row) => true).count() + benchmark.addCase(s"Select $colsNum columns", numIters) { _ => + toNoop(ds.select("*")) } val cols100 = columnNames.take(100).map(Column(_)) - benchmark.addCase(s"Select 100 columns", 3) { _ => - ds.select(cols100: _*).filter((row: Row) => true).count() + benchmark.addCase(s"Select 100 columns", numIters) { _ => + toNoop(ds.select(cols100: _*)) } - benchmark.addCase(s"Select one column", 3) { _ => - ds.select($"col1").filter((row: Row) => true).count() + benchmark.addCase(s"Select one column", numIters) { _ => + toNoop(ds.select($"col1")) } - benchmark.addCase(s"count()", 3) { _ => + benchmark.addCase(s"count()", numIters) { _ => ds.count() } val schemaErr1 = StructType(StructField("col0", DateType) +: (1 until colsNum).map(i => StructField(s"col$i", IntegerType))) val dsErr1 = spark.read.schema(schemaErr1).csv(path.getAbsolutePath) - benchmark.addCase(s"Select 100 columns, one bad input field", 3) { _ => - dsErr1.select(cols100: _*).filter((row: Row) => true).count() + benchmark.addCase(s"Select 100 columns, one bad input field", numIters) { _ => + toNoop(dsErr1.select(cols100: _*)) } val badRecColName = "badRecord" @@ -104,15 +109,15 @@ object CSVBenchmark extends SqlBasedBenchmark { val dsErr2 = spark.read.schema(schemaErr2) .option("columnNameOfCorruptRecord", badRecColName) .csv(path.getAbsolutePath) - benchmark.addCase(s"Select 100 columns, corrupt record field", 3) { _ => - dsErr2.select((Column(badRecColName) +: cols100): _*).filter((row: Row) => true).count() + benchmark.addCase(s"Select 100 columns, corrupt record field", numIters) { _ => + toNoop(dsErr2.select((Column(badRecColName) +: cols100): _*)) } benchmark.run() } } - def countBenchmark(rowsNum: Int): Unit = { + private def countBenchmark(rowsNum: Int, numIters: Int): Unit = { val colsNum = 10 val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output) @@ -128,13 +133,13 @@ object CSVBenchmark extends SqlBasedBenchmark { val ds = spark.read.schema(schema).csv(path.getAbsolutePath) - benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => + benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ => ds.select("*").filter((_: Row) => true).count() } - benchmark.addCase(s"Select 1 column + count()", 3) { _ => + benchmark.addCase(s"Select 1 column + count()", numIters) { _ => ds.select($"col1").filter((_: Row) => true).count() } - benchmark.addCase(s"count()", 3) { _ => + benchmark.addCase(s"count()", numIters) { _ => ds.count() } @@ -142,11 +147,161 @@ object CSVBenchmark extends SqlBasedBenchmark { } } + private def datetimeBenchmark(rowsNum: Int, numIters: Int): Unit = { + def timestamps = { + spark.range(0, rowsNum, 1, 1).mapPartitions { iter => + iter.map(Instant.ofEpochSecond(_)) + }.select($"value".as("timestamp")) + } + + def dates = { + spark.range(0, rowsNum, 1, 1).mapPartitions { iter => + iter.map(d => LocalDate.ofEpochDay(d % (100 * 365))) + }.select($"value".as("date")) + } + + withTempPath { path => + + val timestampDir = new File(path, "timestamp").getAbsolutePath + val dateDir = new File(path, "date").getAbsolutePath + + val writeBench = new Benchmark("Write dates and timestamps", rowsNum, output = output) + writeBench.addCase(s"Create a dataset of timestamps", numIters) { _ => + toNoop(timestamps) + } + + writeBench.addCase("to_csv(timestamp)", numIters) { _ => + toNoop(timestamps.select(to_csv(struct($"timestamp")))) + } + + writeBench.addCase("write timestamps to files", numIters) { _ => + timestamps.write.option("header", true).mode("overwrite").csv(timestampDir) + } + + writeBench.addCase("Create a dataset of dates", numIters) { _ => + toNoop(dates) + } + + writeBench.addCase("to_csv(date)", numIters) { _ => + toNoop(dates.select(to_csv(struct($"date")))) + } + + writeBench.addCase("write dates to files", numIters) { _ => + dates.write.option("header", true).mode("overwrite").csv(dateDir) + } + + writeBench.run() + + val readBench = new Benchmark("Read dates and timestamps", rowsNum, output = output) + val tsSchema = new StructType().add("timestamp", TimestampType) + + readBench.addCase("read timestamp text from files", numIters) { _ => + toNoop(spark.read.text(timestampDir)) + } + + readBench.addCase("read timestamps from files", numIters) { _ => + val ds = spark.read + .option("header", true) + .schema(tsSchema) + .csv(timestampDir) + toNoop(ds) + } + + readBench.addCase("infer timestamps from files", numIters) { _ => + val ds = spark.read + .option("header", true) + .option("inferSchema", true) + .csv(timestampDir) + toNoop(ds) + } + + val dateSchema = new StructType().add("date", DateType) + + readBench.addCase("read date text from files", numIters) { _ => + toNoop(spark.read.text(dateDir)) + } + + readBench.addCase("read date from files", numIters) { _ => + val ds = spark.read + .option("header", true) + .schema(dateSchema) + .csv(dateDir) + toNoop(ds) + } + + readBench.addCase("infer date from files", numIters) { _ => + val ds = spark.read + .option("header", true) + .option("inferSchema", true) + .csv(dateDir) + toNoop(ds) + } + + def timestampStr: Dataset[String] = { + spark.range(0, rowsNum, 1, 1).mapPartitions { iter => + iter.map(i => s"1970-01-01T01:02:03.${100 + i % 100}Z") + }.select($"value".as("timestamp")).as[String] + } + + readBench.addCase("timestamp strings", numIters) { _ => + toNoop(timestampStr) + } + + readBench.addCase("parse timestamps from Dataset[String]", numIters) { _ => + val ds = spark.read + .option("header", false) + .schema(tsSchema) + .csv(timestampStr) + toNoop(ds) + } + + readBench.addCase("infer timestamps from Dataset[String]", numIters) { _ => + val ds = spark.read + .option("header", false) + .option("inferSchema", true) + .csv(timestampStr) + toNoop(ds) + } + + def dateStr: Dataset[String] = { + spark.range(0, rowsNum, 1, 1).mapPartitions { iter => + iter.map(i => LocalDate.ofEpochDay(i % 1000 * 365).toString) + }.select($"value".as("date")).as[String] + } + + readBench.addCase("date strings", numIters) { _ => + toNoop(dateStr) + } + + readBench.addCase("parse dates from Dataset[String]", numIters) { _ => + val ds = spark.read + .option("header", false) + .schema(dateSchema) + .csv(dateStr) + toNoop(ds) + } + + readBench.addCase("from_csv(timestamp)", numIters) { _ => + val ds = timestampStr.select(from_csv($"timestamp", tsSchema, Map.empty[String, String])) + toNoop(ds) + } + + readBench.addCase("from_csv(date)", numIters) { _ => + val ds = dateStr.select(from_csv($"date", dateSchema, Map.empty[String, String])) + toNoop(ds) + } + + readBench.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runBenchmark("Benchmark to measure CSV read/write performance") { - quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3) - multiColumnsBenchmark(rowsNum = 1000 * 1000) - countBenchmark(10 * 1000 * 1000) + val numIters = 3 + quotedValuesBenchmark(rowsNum = 50 * 1000, numIters) + multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters) + countBenchmark(rowsNum = 10 * 1000 * 1000, numIters) + datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org