Repository: spark Updated Branches: refs/heads/master fba9cc846 -> 3d0e17424
[SPARK-21845][SQL] Make codegen fallback of expressions configurable ## What changes were proposed in this pull request? We should make codegen fallback of expressions configurable. So far, it is always on. We might hide it when our codegen have compilation bugs. Thus, we should also disable the codegen fallback when running test cases. ## How was this patch tested? Added test cases Author: gatorsmile <gatorsm...@gmail.com> Closes #19062 from gatorsmile/fallbackCodegen. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d0e1742 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d0e1742 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d0e1742 Branch: refs/heads/master Commit: 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a Parents: fba9cc8 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue Aug 29 20:59:01 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Aug 29 20:59:01 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlan.scala | 15 +++++---------- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +++++++++++- .../org/apache/spark/sql/test/SharedSQLContext.scala | 2 ++ .../org/apache/spark/sql/hive/test/TestHive.scala | 1 + 7 files changed, 24 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a685099..24f51ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -551,9 +551,9 @@ object SQLConf { .intConf .createWithDefault(100) - val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") + val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() - .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + + .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) + def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c7277c2..b1db9dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -56,14 +56,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext - // sqlContext will be null when we are being deserialized on the slaves. In this instance - // the value of subexpressionEliminationEnabled will be set by the deserializer after the - // constructor has run. - val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { - sqlContext.conf.subexpressionEliminationEnabled - } else { - false - } + // whether we should fallback when hitting compilation errors caused by codegen + private val codeGenFallBack = sqlContext.conf.codegenFallback + + protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { @@ -370,8 +366,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e @ (_: JaninoRuntimeException | _: CompileException) - if sqlContext == null || sqlContext.conf.wholeStageFallback => + case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack => genInterpretedPredicate(expression, inputSchema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index bacb709..a41a7ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 0681b9c..50e4759 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { v } withSQLConf( - (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString), + (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString), (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) { val df = spark.range(0, 4, 1, 4).withColumn("c", c) val rows = df.collect() http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5eb34e5..1334164 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val filter = (0 until N) .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) - df.filter(filter).count + + withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { + df.filter(filter).count() + } + + withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { + val e = intercept[SparkException] { + df.filter(filter).count() + }.getMessage + assert(e.contains("grows beyond 64 KB")) + } } test("SPARK-20897: cached self-join should not fail") { http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 1f073d5..cd8d070 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.internal.SQLConf /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 10c9a2d..0f6a81b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -51,6 +51,7 @@ object TestHive "TestSQLContext", new SparkConf() .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org