Repository: spark
Updated Branches:
  refs/heads/master 468da03e2 -> 7504bc73f


[SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code

## What changes were proposed in this pull request?

In case of any bugs in whole-stage codegen, the generated code can't be 
compiled, we should fallback to non-codegen to make sure that query could run.

The batch mode of new parquet reader depends on codegen, can't be easily 
switched to non-batch mode, so we still use codegen for batched scan (for 
parquet). Because it only support primitive types and the number of columns is 
less than spark.sql.codegen.maxFields (100), it should not fail.

This could be configurable by `spark.sql.codegen.fallback`

## How was this patch tested?

Manual test it with buggy operator, it worked well.

Author: Davies Liu <dav...@databricks.com>

Closes #13501 from davies/codegen_fallback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7504bc73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7504bc73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7504bc73

Branch: refs/heads/master
Commit: 7504bc73f20fe0e6546a019ed91c3fd3804287ba
Parents: 468da03
Author: Davies Liu <dav...@databricks.com>
Authored: Fri Jun 10 21:12:06 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Fri Jun 10 21:12:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/ExistingRDD.scala     |  5 ++++-
 .../spark/sql/execution/WholeStageCodegenExec.scala      | 11 ++++++++++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 ++++++++++-
 3 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 9ab98fd1..ee72a70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec(
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException
+    // in the case of fallback, this batched scan should never fail because of:
+    // 1) only primitive types are supported
+    // 2) the number of columns should be smaller than 
spark.sql.codegen.maxFields
+    WholeStageCodegenExec(this).execute()
   }
 
   override def simpleString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index e0d8e35..ac4c3aa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * An interface for those physical operators that support codegen.
@@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) 
extends UnaryExecNode with Co
       new CodeAndComment(CodeFormatter.stripExtraNewLines(source), 
ctx.getPlaceHolderToComments()))
 
     logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
-    CodeGenerator.compile(cleanedSource)
     (ctx, cleanedSource)
   }
 
   override def doExecute(): RDD[InternalRow] = {
     val (ctx, cleanedSource) = doCodeGen()
+    // try to compile and fallback if it failed
+    try {
+      CodeGenerator.compile(cleanedSource)
+    } catch {
+      case e: Exception if !Utils.isTesting && 
sqlContext.conf.wholeStageFallback =>
+        // We should already saw the error message
+        logWarning(s"Whole-stage codegen disabled for this plan:\n 
$treeString")
+        return child.execute()
+    }
     val references = ctx.references.toArray
 
     val durationMs = longMetric("pipelineTime")

http://git-wip-us.apache.org/repos/asf/spark/blob/7504bc73/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 437e093..27b1fff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -433,7 +433,14 @@ object SQLConf {
     .doc("The maximum number of fields (including nested fields) that will be 
supported before" +
       " deactivating whole-stage codegen.")
     .intConf
-    .createWithDefault(200)
+    .createWithDefault(100)
+
+  val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback")
+    .internal()
+    .doc("When true, whole stage codegen could be temporary disabled for the 
part of query that" +
+      " fail to compile generated code")
+    .booleanConf
+    .createWithDefault(true)
 
   val MAX_CASES_BRANCHES = 
SQLConfigBuilder("spark.sql.codegen.maxCaseBranches")
     .internal()
@@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
+  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to