Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22804#discussion_r227582826
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
 ---
    @@ -21,207 +21,212 @@ import scala.concurrent.duration._
     
     import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox
     
    -import org.apache.spark.benchmark.Benchmark
    -import org.apache.spark.sql.Column
    -import org.apache.spark.sql.catalyst.FunctionIdentifier
    -import org.apache.spark.sql.catalyst.catalog.CatalogFunction
    +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
    +import org.apache.spark.sql.{Column, SparkSession}
     import org.apache.spark.sql.catalyst.expressions.Literal
     import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
    -import org.apache.spark.sql.hive.HiveSessionCatalog
    +import org.apache.spark.sql.catalyst.plans.SQLHelper
     import org.apache.spark.sql.hive.execution.TestingTypedCount
    -import org.apache.spark.sql.hive.test.TestHiveSingleton
    +import org.apache.spark.sql.hive.test.TestHive
     import org.apache.spark.sql.internal.SQLConf
     import org.apache.spark.sql.types.LongType
     
    -class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with 
TestHiveSingleton {
    -  ignore("Hive UDAF vs Spark AF") {
    -    val N = 2 << 15
    -
    -    val benchmark = new Benchmark(
    -      name = "hive udaf vs spark af",
    -      valuesPerIteration = N,
    -      minNumIters = 5,
    -      warmupTime = 5.seconds,
    -      minTime = 10.seconds,
    -      outputPerIteration = true
    -    )
    -
    -    registerHiveFunction("hive_percentile_approx", 
classOf[GenericUDAFPercentileApprox])
    -
    -    sparkSession.range(N).createOrReplaceTempView("t")
    -
    -    benchmark.addCase("hive udaf w/o group by") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
    -      sparkSession.sql("SELECT hive_percentile_approx(id, 0.5) FROM 
t").collect()
    -    }
    -
    -    benchmark.addCase("spark af w/o group by") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      sparkSession.sql("SELECT percentile_approx(id, 0.5) FROM 
t").collect()
    -    }
    -
    -    benchmark.addCase("hive udaf w/ group by") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
    -      sparkSession.sql(
    -        s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / 
${N / 4} AS BIGINT)"
    -      ).collect()
    -    }
    -
    -    benchmark.addCase("spark af w/ group by w/o fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      sparkSession.sql(
    -        s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N 
/ 4} AS BIGINT)"
    -      ).collect()
    -    }
    -
    -    benchmark.addCase("spark af w/ group by w/ fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      
sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2")
    -      sparkSession.sql(
    -        s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N 
/ 4} AS BIGINT)"
    -      ).collect()
    -    }
    -
    -    benchmark.run()
    -
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    -
    -    hive udaf vs spark af:                   Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    hive udaf w/o group by                        5326 / 5408          0.0 
      81264.2       1.0X
    -    spark af w/o group by                           93 /  111          0.7 
       1415.6      57.4X
    -    hive udaf w/ group by                         3804 / 3946          0.0 
      58050.1       1.4X
    -    spark af w/ group by w/o fallback               71 /   90          0.9 
       1085.7      74.8X
    -    spark af w/ group by w/ fallback                98 /  111          0.7 
       1501.6      54.1X
    -     */
    -  }
    -
    -  ignore("ObjectHashAggregateExec vs SortAggregateExec - typed_count") {
    -    val N: Long = 1024 * 1024 * 100
    -
    -    val benchmark = new Benchmark(
    -      name = "object agg v.s. sort agg",
    -      valuesPerIteration = N,
    -      minNumIters = 1,
    -      warmupTime = 10.seconds,
    -      minTime = 45.seconds,
    -      outputPerIteration = true
    -    )
    -
    -    import sparkSession.implicits._
    -
    -    def typed_count(column: Column): Column =
    -      Column(TestingTypedCount(column.expr).toAggregateExpression())
    -
    -    val df = sparkSession.range(N)
    -
    -    benchmark.addCase("sort agg w/ group by") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
    -      df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
    -    }
    -
    -    benchmark.addCase("object agg w/ group by w/o fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
    -    }
    -
    -    benchmark.addCase("object agg w/ group by w/ fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      
sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2")
    -      df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
    -    }
    -
    -    benchmark.addCase("sort agg w/o group by") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
    -      df.select(typed_count($"id")).collect()
    -    }
    -
    -    benchmark.addCase("object agg w/o group by w/o fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      df.select(typed_count($"id")).collect()
    -    }
    -
    -    benchmark.run()
    -
    -    /*
    -    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
    -    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    -
    -    object agg v.s. sort agg:                Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
    -    
------------------------------------------------------------------------------------------------
    -    sort agg w/ group by                        31251 / 31908          3.4 
        298.0       1.0X
    -    object agg w/ group by w/o fallback           6903 / 7141         15.2 
         65.8       4.5X
    -    object agg w/ group by w/ fallback          20945 / 21613          5.0 
        199.7       1.5X
    -    sort agg w/o group by                         4734 / 5463         22.1 
         45.2       6.6X
    -    object agg w/o group by w/o fallback          4310 / 4529         24.3 
         41.1       7.3X
    -     */
    -  }
    -
    -  ignore("ObjectHashAggregateExec vs SortAggregateExec - 
percentile_approx") {
    -    val N = 2 << 20
    -
    -    val benchmark = new Benchmark(
    -      name = "object agg v.s. sort agg",
    -      valuesPerIteration = N,
    -      minNumIters = 5,
    -      warmupTime = 15.seconds,
    -      minTime = 45.seconds,
    -      outputPerIteration = true
    -    )
    -
    -    import sparkSession.implicits._
    -
    -    val df = sparkSession.range(N).coalesce(1)
    -
    -    benchmark.addCase("sort agg w/ group by") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
    -      df.groupBy($"id" / (N / 4) cast 
LongType).agg(percentile_approx($"id", 0.5)).collect()
    -    }
    -
    -    benchmark.addCase("object agg w/ group by w/o fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      df.groupBy($"id" / (N / 4) cast 
LongType).agg(percentile_approx($"id", 0.5)).collect()
    -    }
    -
    -    benchmark.addCase("object agg w/ group by w/ fallback") { _ =>
    -      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
    -      
sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2")
    -      df.groupBy($"id" / (N / 4) cast 
LongType).agg(percentile_approx($"id", 0.5)).collect()
    +/**
    + * Benchmark to measure hash based aggregation.
    + * To run this benchmark:
    + * {{{
    + *   1. without sbt: bin/spark-submit --class <this class>
    + *        --jars <spark catalyst test jar>,<spark core test jar>,<spark 
hive jar>
    + *        --packages org.spark-project.hive:hive-exec:1.2.1.spark2
    + *        <spark hive test jar>
    + *   2. build/sbt "hive/test:runMain <this class>"
    + *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"hive/test:runMain <this class>"
    + *      Results will be written to 
"benchmarks/ObjectHashAggregateExecBenchmark-results.txt".
    + * }}}
    + */
    +object ObjectHashAggregateExecBenchmark extends BenchmarkBase with 
SQLHelper {
    +
    +  val spark: SparkSession = TestHive.sparkSession
    +
    +  override def runBenchmarkSuite(): Unit = {
    +    runBenchmark("Hive UDAF vs Spark AF") {
    --- End diff --
    
    Hi, @peter-toth . Thank you for making this PR.
    Currently, `runBenchmarkSuite` is too long. Could you make a separate 
function for each test case? For example, `ignore("Hive UDAF vs Spark AF")` can 
be a single function. And `runBenchmarkSuite` will call a series of those 
functions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to