Repository: spark
Updated Branches:
  refs/heads/master fba9cc846 -> 3d0e17424


[SPARK-21845][SQL] Make codegen fallback of expressions configurable

## What changes were proposed in this pull request?
We should make codegen fallback of expressions configurable. So far, it is 
always on. We might hide it when our codegen have compilation bugs. Thus, we 
should also disable the codegen fallback when running test cases.

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsm...@gmail.com>

Closes #19062 from gatorsmile/fallbackCodegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d0e1742
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d0e1742
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d0e1742

Branch: refs/heads/master
Commit: 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a
Parents: fba9cc8
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Aug 29 20:59:01 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Aug 29 20:59:01 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala      |  6 +++---
 .../org/apache/spark/sql/execution/SparkPlan.scala   | 15 +++++----------
 .../spark/sql/execution/WholeStageCodegenExec.scala  |  2 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala   |  2 +-
 .../scala/org/apache/spark/sql/DataFrameSuite.scala  | 12 +++++++++++-
 .../org/apache/spark/sql/test/SharedSQLContext.scala |  2 ++
 .../org/apache/spark/sql/hive/test/TestHive.scala    |  1 +
 7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a685099..24f51ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -551,9 +551,9 @@ object SQLConf {
     .intConf
     .createWithDefault(100)
 
-  val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback")
+  val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
     .internal()
-    .doc("When true, whole stage codegen could be temporary disabled for the 
part of query that" +
+    .doc("When true, (whole stage) codegen could be temporary disabled for the 
part of query that" +
       " fail to compile generated code")
     .booleanConf
     .createWithDefault(true)
@@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
-  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+  def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
 
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c7277c2..b1db9dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -56,14 +56,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 
   protected def sparkContext = sqlContext.sparkContext
 
-  // sqlContext will be null when we are being deserialized on the slaves.  In 
this instance
-  // the value of subexpressionEliminationEnabled will be set by the 
deserializer after the
-  // constructor has run.
-  val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
-    sqlContext.conf.subexpressionEliminationEnabled
-  } else {
-    false
-  }
+  // whether we should fallback when hitting compilation errors caused by 
codegen
+  private val codeGenFallBack = sqlContext.conf.codegenFallback
+
+  protected val subexpressionEliminationEnabled = 
sqlContext.conf.subexpressionEliminationEnabled
 
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
@@ -370,8 +366,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     try {
       GeneratePredicate.generate(expression, inputSchema)
     } catch {
-      case e @ (_: JaninoRuntimeException | _: CompileException)
-          if sqlContext == null || sqlContext.conf.wholeStageFallback =>
+      case _ @ (_: JaninoRuntimeException | _: CompileException) if 
codeGenFallBack =>
         genInterpretedPredicate(expression, inputSchema)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index bacb709..a41a7ca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends 
UnaryExecNode with Co
     try {
       CodeGenerator.compile(cleanedSource)
     } catch {
-      case e: Exception if !Utils.isTesting && 
sqlContext.conf.wholeStageFallback =>
+      case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback 
=>
         // We should already saw the error message
         logWarning(s"Whole-stage codegen disabled for this plan:\n 
$treeString")
         return child.execute()

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 0681b9c..50e4759 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
         v
       }
       withSQLConf(
-        (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString),
+        (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString),
         (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) {
         val df = spark.range(0, 4, 1, 4).withColumn("c", c)
         val rows = df.collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5eb34e5..1334164 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 
     val filter = (0 until N)
       .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= 
"string"))
-    df.filter(filter).count
+
+    withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") {
+      df.filter(filter).count()
+    }
+
+    withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") {
+      val e = intercept[SparkException] {
+        df.filter(filter).count()
+      }.getMessage
+      assert(e.contains("grows beyond 64 KB"))
+    }
   }
 
   test("SPARK-20897: cached self-join should not fail") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 1f073d5..cd8d070 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Helper trait for SQL test suites where all tests share a single 
[[TestSparkSession]].
@@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with 
BeforeAndAfterEach with Eventua
     new SparkConf()
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(SQLConf.CODEGEN_FALLBACK.key, "false")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0e1742/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 10c9a2d..0f6a81b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -51,6 +51,7 @@ object TestHive
       "TestSQLContext",
       new SparkConf()
         .set("spark.sql.test", "")
+        .set(SQLConf.CODEGEN_FALLBACK.key, "false")
         .set("spark.sql.hive.metastore.barrierPrefixes",
           "org.apache.spark.sql.hive.execution.PairSerDe")
         .set("spark.sql.warehouse.dir", 
TestHiveContext.makeWarehouseDir().toURI.getPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to