Repository: spark
Updated Branches:
  refs/heads/master a05cc425a -> 2404d8e54


Revert "[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset"

This reverts commit a05cc425a0a7d18570b99883993a04ad175aa071.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2404d8e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2404d8e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2404d8e5

Branch: refs/heads/master
Commit: 2404d8e54b6b2cfc78d892e7ebb31578457518a3
Parents: a05cc42
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Dec 27 10:03:52 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Dec 27 10:03:52 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DatasetBenchmark.scala | 75 +++++++++-----------
 1 file changed, 33 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2404d8e5/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
index cd925e6..66d94d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
@@ -33,13 +34,11 @@ object DatasetBenchmark {
   def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): 
Benchmark = {
     import spark.implicits._
 
-    val rdd = spark.sparkContext.range(0, numRows)
-    val ds = spark.range(0, numRows)
-    val df = ds.toDF("l")
-    val func = (l: Long) => l + 1
-
+    val df = spark.range(1, numRows).select($"id".as("l"), 
$"id".cast(StringType).as("s"))
     val benchmark = new Benchmark("back-to-back map", numRows)
+    val func = (d: Data) => Data(d.l + 1, d.s)
 
+    val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, 
l.toString))
     benchmark.addCase("RDD") { iter =>
       var res = rdd
       var i = 0
@@ -54,14 +53,14 @@ object DatasetBenchmark {
       var res = df
       var i = 0
       while (i < numChains) {
-        res = res.select($"l" + 1 as "l")
+        res = res.select($"l" + 1 as "l", $"s")
         i += 1
       }
       res.queryExecution.toRdd.foreach(_ => Unit)
     }
 
     benchmark.addCase("Dataset") { iter =>
-      var res = ds.as[Long]
+      var res = df.as[Data]
       var i = 0
       while (i < numChains) {
         res = res.map(func)
@@ -76,14 +75,14 @@ object DatasetBenchmark {
   def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): 
Benchmark = {
     import spark.implicits._
 
-    val rdd = spark.sparkContext.range(0, numRows)
-    val ds = spark.range(0, numRows)
-    val df = ds.toDF("l")
-    val func = (l: Long, i: Int) => l % (100L + i) == 0L
-    val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) }
-
+    val df = spark.range(1, numRows).select($"id".as("l"), 
$"id".cast(StringType).as("s"))
     val benchmark = new Benchmark("back-to-back filter", numRows)
+    val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
+    val funcs = 0.until(numChains).map { i =>
+      (d: Data) => func(d, i)
+    }
 
+    val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, 
l.toString))
     benchmark.addCase("RDD") { iter =>
       var res = rdd
       var i = 0
@@ -105,7 +104,7 @@ object DatasetBenchmark {
     }
 
     benchmark.addCase("Dataset") { iter =>
-      var res = ds.as[Long]
+      var res = df.as[Data]
       var i = 0
       while (i < numChains) {
         res = res.filter(funcs(i))
@@ -134,29 +133,24 @@ object DatasetBenchmark {
   def aggregate(spark: SparkSession, numRows: Long): Benchmark = {
     import spark.implicits._
 
-    val rdd = spark.sparkContext.range(0, numRows)
-    val ds = spark.range(0, numRows)
-    val df = ds.toDF("l")
-
+    val df = spark.range(1, numRows).select($"id".as("l"), 
$"id".cast(StringType).as("s"))
     val benchmark = new Benchmark("aggregate", numRows)
 
+    val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, 
l.toString))
     benchmark.addCase("RDD sum") { iter =>
-      rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit)
+      rdd.aggregate(0L)(_ + _.l, _ + _)
     }
 
     benchmark.addCase("DataFrame sum") { iter =>
-      df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => 
Unit)
+      df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
     }
 
     benchmark.addCase("Dataset sum using Aggregator") { iter =>
-      val result = ds.as[Long].groupByKey(_ % 
10).agg(typed.sumLong[Long](identity))
-      result.queryExecution.toRdd.foreach(_ => Unit)
+      df.as[Data].select(typed.sumLong((d: Data) => 
d.l)).queryExecution.toRdd.foreach(_ => Unit)
     }
 
-    val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data]
     benchmark.addCase("Dataset complex Aggregator") { iter =>
-      val result = complexDs.groupByKey(_.l % 
10).agg(ComplexAggregator.toColumn)
-      result.queryExecution.toRdd.foreach(_ => Unit)
+      
df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ 
=> Unit)
     }
 
     benchmark
@@ -176,39 +170,36 @@ object DatasetBenchmark {
     val benchmark3 = aggregate(spark, numRows)
 
     /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-
+    OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
+    Intel Xeon E3-12xx v2 (Ivy Bridge)
     back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
     
------------------------------------------------------------------------------------------------
-    RDD                                           3963 / 3976         25.2     
     39.6       1.0X
-    DataFrame                                      826 /  834        121.1     
      8.3       4.8X
-    Dataset                                       5178 / 5198         19.3     
     51.8       0.8X
+    RDD                                           3448 / 3646         29.0     
     34.5       1.0X
+    DataFrame                                     2647 / 3116         37.8     
     26.5       1.3X
+    Dataset                                       4781 / 5155         20.9     
     47.8       0.7X
     */
     benchmark.run()
 
     /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-
+    OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
+    Intel Xeon E3-12xx v2 (Ivy Bridge)
     back-to-back filter:                     Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
     
------------------------------------------------------------------------------------------------
-    RDD                                            533 /  587        187.6     
      5.3       1.0X
-    DataFrame                                       79 /   91       1269.0     
      0.8       6.8X
-    Dataset                                        550 /  559        181.7     
      5.5       1.0X
+    RDD                                           1346 / 1618         74.3     
     13.5       1.0X
+    DataFrame                                       59 /   72       1695.4     
      0.6      22.8X
+    Dataset                                       2777 / 2805         36.0     
     27.8       0.5X
     */
     benchmark2.run()
 
     /*
     Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
     Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-
     aggregate:                               Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
     
------------------------------------------------------------------------------------------------
-    RDD sum                                       2297 / 2440         43.5     
     23.0       1.0X
-    DataFrame sum                                  630 /  637        158.7     
      6.3       3.6X
-    Dataset sum using Aggregator                  3129 / 3247         32.0     
     31.3       0.7X
-    Dataset complex Aggregator                  12109 / 12142          8.3     
    121.1       0.2X
+    RDD sum                                       1913 / 1942         52.3     
     19.1       1.0X
+    DataFrame sum                                   46 /   61       2157.7     
      0.5      41.3X
+    Dataset sum using Aggregator                  4656 / 4758         21.5     
     46.6       0.4X
+    Dataset complex Aggregator                    6636 / 7039         15.1     
     66.4       0.3X
     */
     benchmark3.run()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to