This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 55f26d8  [SPARK-27533][SQL][TEST] Date and timestamp CSV benchmarks
55f26d8 is described below

commit 55f26d809008d26e9727874128aee0a61dcfea00
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Tue Apr 23 11:08:02 2019 +0900

    [SPARK-27533][SQL][TEST] Date and timestamp CSV benchmarks
    
    ## What changes were proposed in this pull request?
    
    Added new CSV benchmarks related to date and timestamps operations:
    - Write date/timestamp to CSV files
    - `to_csv()` and `from_csv()` for dates and timestamps
    - Read date/timestamps from CSV files, and infer schemas
    - Parse and infer schemas from `Dataset[String]`
    
    Also existing CSV benchmarks are ported on `NoOp` datasource.
    
    Closes #24429 from MaxGekk/csv-timestamp-benchmark.
    
    Authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 sql/core/benchmarks/CSVBenchmark-results.txt       |  73 +++++---
 .../execution/datasources/csv/CSVBenchmark.scala   | 201 ++++++++++++++++++---
 2 files changed, 229 insertions(+), 45 deletions(-)

diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt 
b/sql/core/benchmarks/CSVBenchmark-results.txt
index 4fef15b..888c2ce 100644
--- a/sql/core/benchmarks/CSVBenchmark-results.txt
+++ b/sql/core/benchmarks/CSVBenchmark-results.txt
@@ -2,29 +2,58 @@
 Benchmark to measure CSV read/write performance
 
================================================================================================
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
-Intel(R) Xeon(R) CPU @ 2.50GHz
-Parsing quoted values:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------------
-One quoted string                           49754 / 50158          0.0      
995072.2       1.0X
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
+Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
+Parsing quoted values:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+One quoted string                                 36998          37134         
120          0.0      739953.1       1.0X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
-Intel(R) Xeon(R) CPU @ 2.50GHz
-Wide rows with 1000 columns:             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------------
-Select 1000 columns                       149402 / 151785          0.0      
149401.9       1.0X
-Select 100 columns                          42986 / 43985          0.0       
42986.1       3.5X
-Select one column                           33764 / 34057          0.0       
33763.6       4.4X
-count()                                       9332 / 9508          0.1        
9332.2      16.0X
-Select 100 columns, one bad input field     50963 / 51512          0.0       
50962.5       2.9X
-Select 100 columns, corrupt record field    69627 / 71029          0.0       
69627.5       2.1X
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
+Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
+Wide rows with 1000 columns:              Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Select 1000 columns                              140620         141162         
737          0.0      140620.5       1.0X
+Select 100 columns                                35170          35287         
183          0.0       35170.0       4.0X
+Select one column                                 27711          27927         
187          0.0       27710.9       5.1X
+count()                                            7707           7804         
 84          0.1        7707.4      18.2X
+Select 100 columns, one bad input field           41762          41851         
117          0.0       41761.8       3.4X
+Select 100 columns, corrupt record field          48717          48761         
 44          0.0       48717.4       2.9X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic
-Intel(R) Xeon(R) CPU @ 2.50GHz
-Count a dataset with 10 columns:         Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------------
-Select 10 columns + count()                 22588 / 22623          0.4        
2258.8       1.0X
-Select 1 column + count()                   14649 / 14690          0.7        
1464.9       1.5X
-count()                                       3385 / 3453          3.0         
338.5       6.7X
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
+Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
+Count a dataset with 10 columns:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Select 10 columns + count()                       16001          16053         
 53          0.6        1600.1       1.0X
+Select 1 column + count()                         11571          11614         
 58          0.9        1157.1       1.4X
+count()                                            4752           4766         
 18          2.1         475.2       3.4X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
+Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
+Write dates and timestamps:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Create a dataset of timestamps                     1070           1072         
  2          9.3         107.0       1.0X
+to_csv(timestamp)                                 10446          10746         
344          1.0        1044.6       0.1X
+write timestamps to files                          9573           9659         
101          1.0         957.3       0.1X
+Create a dataset of dates                          1245           1260         
 17          8.0         124.5       0.9X
+to_csv(date)                                       7157           7167         
 11          1.4         715.7       0.1X
+write dates to files                               5415           5450         
 57          1.8         541.5       0.2X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
+Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
+Read dates and timestamps:                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+read timestamp text from files                     1880           1887         
  8          5.3         188.0       1.0X
+read timestamps from files                        27135          27180         
 43          0.4        2713.5       0.1X
+infer timestamps from files                       51426          51534         
 97          0.2        5142.6       0.0X
+read date text from files                          1618           1622         
  4          6.2         161.8       1.2X
+read date from files                              20207          20218         
 13          0.5        2020.7       0.1X
+infer date from files                             19418          19479         
 94          0.5        1941.8       0.1X
+timestamp strings                                  2289           2300         
 13          4.4         228.9       0.8X
+parse timestamps from Dataset[String]             29367          29391         
 24          0.3        2936.7       0.1X
+infer timestamps from Dataset[String]             54782          54902         
126          0.2        5478.2       0.0X
+date strings                                       2508           2524         
 16          4.0         250.8       0.7X
+parse dates from Dataset[String]                  21884          21902         
 19          0.5        2188.4       0.1X
+from_csv(timestamp)                               27188          27723         
477          0.4        2718.8       0.1X
+from_csv(date)                                    21137          21191         
 84          0.5        2113.7       0.1X
 
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
index 6e6fc47..e41e81a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala
@@ -16,10 +16,13 @@
  */
 package org.apache.spark.sql.execution.datasources.csv
 
+import java.io.File
+import java.time.{Instant, LocalDate}
+
 import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.sql.{Column, Row}
+import org.apache.spark.sql.{Column, Dataset, Row}
 import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
-import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
 /**
@@ -39,7 +42,9 @@ import org.apache.spark.sql.types._
 object CSVBenchmark extends SqlBasedBenchmark {
   import spark.implicits._
 
-  def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
+  private def toNoop(ds: Dataset[_]): Unit = ds.write.format("noop").save()
+
+  private def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
     val benchmark = new Benchmark(s"Parsing quoted values", rowsNum, output = 
output)
 
     withTempPath { path =>
@@ -54,14 +59,14 @@ object CSVBenchmark extends SqlBasedBenchmark {
       val ds = spark.read.option("header", 
true).schema(schema).csv(path.getAbsolutePath)
 
       benchmark.addCase(s"One quoted string", numIters) { _ =>
-        ds.filter((_: Row) => true).count()
+        toNoop(ds)
       }
 
       benchmark.run()
     }
   }
 
-  def multiColumnsBenchmark(rowsNum: Int): Unit = {
+  private def multiColumnsBenchmark(rowsNum: Int, numIters: Int): Unit = {
     val colsNum = 1000
     val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum, 
output = output)
 
@@ -78,25 +83,25 @@ object CSVBenchmark extends SqlBasedBenchmark {
 
       val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
 
-      benchmark.addCase(s"Select $colsNum columns", 3) { _ =>
-        ds.select("*").filter((row: Row) => true).count()
+      benchmark.addCase(s"Select $colsNum columns", numIters) { _ =>
+        toNoop(ds.select("*"))
       }
       val cols100 = columnNames.take(100).map(Column(_))
-      benchmark.addCase(s"Select 100 columns", 3) { _ =>
-        ds.select(cols100: _*).filter((row: Row) => true).count()
+      benchmark.addCase(s"Select 100 columns", numIters) { _ =>
+        toNoop(ds.select(cols100: _*))
       }
-      benchmark.addCase(s"Select one column", 3) { _ =>
-        ds.select($"col1").filter((row: Row) => true).count()
+      benchmark.addCase(s"Select one column", numIters) { _ =>
+        toNoop(ds.select($"col1"))
       }
-      benchmark.addCase(s"count()", 3) { _ =>
+      benchmark.addCase(s"count()", numIters) { _ =>
         ds.count()
       }
 
       val schemaErr1 = StructType(StructField("col0", DateType) +:
         (1 until colsNum).map(i => StructField(s"col$i", IntegerType)))
       val dsErr1 = spark.read.schema(schemaErr1).csv(path.getAbsolutePath)
-      benchmark.addCase(s"Select 100 columns, one bad input field", 3) { _ =>
-        dsErr1.select(cols100: _*).filter((row: Row) => true).count()
+      benchmark.addCase(s"Select 100 columns, one bad input field", numIters) 
{ _ =>
+        toNoop(dsErr1.select(cols100: _*))
       }
 
       val badRecColName = "badRecord"
@@ -104,15 +109,15 @@ object CSVBenchmark extends SqlBasedBenchmark {
       val dsErr2 = spark.read.schema(schemaErr2)
         .option("columnNameOfCorruptRecord", badRecColName)
         .csv(path.getAbsolutePath)
-      benchmark.addCase(s"Select 100 columns, corrupt record field", 3) { _ =>
-        dsErr2.select((Column(badRecColName) +: cols100): _*).filter((row: 
Row) => true).count()
+      benchmark.addCase(s"Select 100 columns, corrupt record field", numIters) 
{ _ =>
+        toNoop(dsErr2.select((Column(badRecColName) +: cols100): _*))
       }
 
       benchmark.run()
     }
   }
 
-  def countBenchmark(rowsNum: Int): Unit = {
+  private def countBenchmark(rowsNum: Int, numIters: Int): Unit = {
     val colsNum = 10
     val benchmark =
       new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output 
= output)
@@ -128,13 +133,13 @@ object CSVBenchmark extends SqlBasedBenchmark {
 
       val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
 
-      benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ =>
+      benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ =>
         ds.select("*").filter((_: Row) => true).count()
       }
-      benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
+      benchmark.addCase(s"Select 1 column + count()", numIters) { _ =>
         ds.select($"col1").filter((_: Row) => true).count()
       }
-      benchmark.addCase(s"count()", 3) { _ =>
+      benchmark.addCase(s"count()", numIters) { _ =>
         ds.count()
       }
 
@@ -142,11 +147,161 @@ object CSVBenchmark extends SqlBasedBenchmark {
     }
   }
 
+  private def datetimeBenchmark(rowsNum: Int, numIters: Int): Unit = {
+    def timestamps = {
+      spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
+        iter.map(Instant.ofEpochSecond(_))
+      }.select($"value".as("timestamp"))
+    }
+
+    def dates = {
+      spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
+        iter.map(d => LocalDate.ofEpochDay(d % (100 * 365)))
+      }.select($"value".as("date"))
+    }
+
+    withTempPath { path =>
+
+      val timestampDir = new File(path, "timestamp").getAbsolutePath
+      val dateDir = new File(path, "date").getAbsolutePath
+
+      val writeBench = new Benchmark("Write dates and timestamps", rowsNum, 
output = output)
+      writeBench.addCase(s"Create a dataset of timestamps", numIters) { _ =>
+        toNoop(timestamps)
+      }
+
+      writeBench.addCase("to_csv(timestamp)", numIters) { _ =>
+        toNoop(timestamps.select(to_csv(struct($"timestamp"))))
+      }
+
+      writeBench.addCase("write timestamps to files", numIters) { _ =>
+        timestamps.write.option("header", 
true).mode("overwrite").csv(timestampDir)
+      }
+
+      writeBench.addCase("Create a dataset of dates", numIters) { _ =>
+        toNoop(dates)
+      }
+
+      writeBench.addCase("to_csv(date)", numIters) { _ =>
+        toNoop(dates.select(to_csv(struct($"date"))))
+      }
+
+      writeBench.addCase("write dates to files", numIters) { _ =>
+        dates.write.option("header", true).mode("overwrite").csv(dateDir)
+      }
+
+      writeBench.run()
+
+      val readBench = new Benchmark("Read dates and timestamps", rowsNum, 
output = output)
+      val tsSchema = new StructType().add("timestamp", TimestampType)
+
+      readBench.addCase("read timestamp text from files", numIters) { _ =>
+        toNoop(spark.read.text(timestampDir))
+      }
+
+      readBench.addCase("read timestamps from files", numIters) { _ =>
+        val ds = spark.read
+          .option("header", true)
+          .schema(tsSchema)
+          .csv(timestampDir)
+        toNoop(ds)
+      }
+
+      readBench.addCase("infer timestamps from files", numIters) { _ =>
+        val ds = spark.read
+          .option("header", true)
+          .option("inferSchema", true)
+          .csv(timestampDir)
+        toNoop(ds)
+      }
+
+      val dateSchema = new StructType().add("date", DateType)
+
+      readBench.addCase("read date text from files", numIters) { _ =>
+        toNoop(spark.read.text(dateDir))
+      }
+
+      readBench.addCase("read date from files", numIters) { _ =>
+        val ds = spark.read
+          .option("header", true)
+          .schema(dateSchema)
+          .csv(dateDir)
+        toNoop(ds)
+      }
+
+      readBench.addCase("infer date from files", numIters) { _ =>
+        val ds = spark.read
+          .option("header", true)
+          .option("inferSchema", true)
+          .csv(dateDir)
+        toNoop(ds)
+      }
+
+      def timestampStr: Dataset[String] = {
+        spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
+          iter.map(i => s"1970-01-01T01:02:03.${100 + i % 100}Z")
+        }.select($"value".as("timestamp")).as[String]
+      }
+
+      readBench.addCase("timestamp strings", numIters) { _ =>
+        toNoop(timestampStr)
+      }
+
+      readBench.addCase("parse timestamps from Dataset[String]", numIters) { _ 
=>
+        val ds = spark.read
+          .option("header", false)
+          .schema(tsSchema)
+          .csv(timestampStr)
+        toNoop(ds)
+      }
+
+      readBench.addCase("infer timestamps from Dataset[String]", numIters) { _ 
=>
+        val ds = spark.read
+          .option("header", false)
+          .option("inferSchema", true)
+          .csv(timestampStr)
+        toNoop(ds)
+      }
+
+      def dateStr: Dataset[String] = {
+        spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
+          iter.map(i => LocalDate.ofEpochDay(i % 1000 * 365).toString)
+        }.select($"value".as("date")).as[String]
+      }
+
+      readBench.addCase("date strings", numIters) { _ =>
+        toNoop(dateStr)
+      }
+
+      readBench.addCase("parse dates from Dataset[String]", numIters) { _ =>
+        val ds = spark.read
+          .option("header", false)
+          .schema(dateSchema)
+          .csv(dateStr)
+        toNoop(ds)
+      }
+
+      readBench.addCase("from_csv(timestamp)", numIters) { _ =>
+        val ds = timestampStr.select(from_csv($"timestamp", tsSchema, 
Map.empty[String, String]))
+        toNoop(ds)
+      }
+
+      readBench.addCase("from_csv(date)", numIters) { _ =>
+        val ds = dateStr.select(from_csv($"date", dateSchema, 
Map.empty[String, String]))
+        toNoop(ds)
+      }
+
+      readBench.run()
+    }
+  }
+
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
     runBenchmark("Benchmark to measure CSV read/write performance") {
-      quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
-      multiColumnsBenchmark(rowsNum = 1000 * 1000)
-      countBenchmark(10 * 1000 * 1000)
+      val numIters = 3
+      quotedValuesBenchmark(rowsNum = 50 * 1000, numIters)
+      multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters)
+      countBenchmark(rowsNum = 10 * 1000 * 1000, numIters)
+      datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to