This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 414d323 [SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark 414d323 is described below commit 414d323d6c92584beb87e1c426e4beab5ddbd452 Author: Takeshi Yamamuro <yamam...@apache.org> AuthorDate: Mon Jan 4 10:31:20 2021 -0800 [SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark ### What changes were proposed in this pull request? This PR intends to add a new option `--cbo` to enable CBO in TPCDSQueryBenchmark. I think this option is useful so as to monitor performance changes with CBO enabled. ### Why are the changes needed? To monitor performance chaneges with CBO enabled. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked. Closes #31011 from maropu/AddOptionForCBOInTPCDSBenchmark. Authored-by: Takeshi Yamamuro <yamam...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../execution/benchmark/TPCDSQueryBenchmark.scala | 39 +++++++++++++++++++--- .../benchmark/TPCDSQueryBenchmarkArguments.scala | 6 ++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index f931914..b34eac5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -19,11 +19,15 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils /** * Benchmark to measure TPCDS query performance. @@ -38,7 +42,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation * Results will be written to "benchmarks/TPCDSQueryBenchmark-results.txt". * }}} */ -object TPCDSQueryBenchmark extends SqlBasedBenchmark { +object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging { + + private lazy val warehousePath = + Utils.createTempDir(namePrefix = "spark-warehouse").getAbsolutePath override def getSparkSession: SparkSession = { val conf = new SparkConf() @@ -50,6 +57,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { .set("spark.executor.memory", "3g") .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString) .set("spark.sql.crossJoin.enabled", "true") + .set("spark.sql.warehouse.dir", warehousePath) SparkSession.builder.config(conf).getOrCreate() } @@ -60,9 +68,14 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { "web_returns", "web_site", "reason", "call_center", "warehouse", "ship_mode", "income_band", "time_dim", "web_page") - def setupTables(dataLocation: String): Map[String, Long] = { + def setupTables(dataLocation: String, createTempView: Boolean): Map[String, Long] = { tables.map { tableName => - spark.read.parquet(s"$dataLocation/$tableName").createOrReplaceTempView(tableName) + val df = spark.read.parquet(s"$dataLocation/$tableName") + if (createTempView) { + df.createOrReplaceTempView(tableName) + } else { + df.write.saveAsTable(tableName) + } tableName -> spark.table(tableName).count() }.toMap } @@ -146,7 +159,25 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}") } - val tableSizes = setupTables(benchmarkArgs.dataLocation) + val tableSizes = setupTables(benchmarkArgs.dataLocation, + createTempView = !benchmarkArgs.cboEnabled) + if (benchmarkArgs.cboEnabled) { + spark.sql(s"SET ${SQLConf.CBO_ENABLED.key}=true") + spark.sql(s"SET ${SQLConf.PLAN_STATS_ENABLED.key}=true") + spark.sql(s"SET ${SQLConf.JOIN_REORDER_ENABLED.key}=true") + spark.sql(s"SET ${SQLConf.HISTOGRAM_ENABLED.key}=true") + + // Analyze all the tables before running TPCDS queries + val startTime = System.nanoTime() + tables.foreach { tableName => + spark.sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR ALL COLUMNS") + } + logInfo("The elapsed time to analyze all the tables is " + + s"${(System.nanoTime() - startTime) / NANOS_PER_SECOND.toDouble} seconds") + } else { + spark.sql(s"SET ${SQLConf.CBO_ENABLED.key}=false") + } + runTpcdsQueries(queryLocation = "tpcds", queries = queriesV1_4ToRun, tableSizes) runTpcdsQueries(queryLocation = "tpcds-v2.7.0", queries = queriesV2_7ToRun, tableSizes, nameSuffix = nameSuffixForQueriesV2_7) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala index 184ffff..80a6bff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala @@ -23,6 +23,7 @@ import java.util.Locale class TPCDSQueryBenchmarkArguments(val args: Array[String]) { var dataLocation: String = null var queryFilter: Set[String] = Set.empty + var cboEnabled: Boolean = false parseArgs(args.toList) validateArguments() @@ -44,6 +45,10 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) { queryFilter = value.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet args = tail + case optName :: tail if optionMatch("--cbo", optName) => + cboEnabled = true + args = tail + case _ => // scalastyle:off println System.err.println("Unknown/unsupported param " + args) @@ -60,6 +65,7 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) { |Options: | --data-location Path to TPCDS data | --query-filter Queries to filter, e.g., q3,q5,q13 + | --cbo Whether to enable cost-based optimization | |------------------------------------------------------------------------------------------------------------------ |In order to run this benchmark, please follow the instructions at --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org