Repository: spark
Updated Branches:
  refs/heads/branch-2.2 f99456b5f -> 92837aeb4


[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due to 64KB 
bytecode size limit

## What changes were proposed in this pull request?

When an expression for `df.filter()` has many nodes (e.g. 400), the size of 
Java bytecode for the generated Java code is more than 64KB. It produces an 
Java exception. As a result, the execution fails.
This PR continues to execute by calling `Expression.eval()` disabling code 
generation if an exception has been caught.

## How was this patch tested?

Add a test suite into `DataFrameSuite`

Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>

Closes #17087 from kiszk/SPARK-19372.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92837aeb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92837aeb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92837aeb

Branch: refs/heads/branch-2.2
Commit: 92837aeb47fc3427166e4b6e62f6130f7480d7fa
Parents: f99456b
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Authored: Tue May 16 14:47:21 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri May 26 11:15:21 2017 -0700

----------------------------------------------------------------------
 .../catalyst/catalog/ExternalCatalogUtils.scala |  2 +-
 .../expressions/codegen/CodeGenerator.scala     | 21 ++++++++++++++----
 .../sql/catalyst/expressions/predicates.scala   | 10 +++++----
 .../apache/spark/sql/execution/SparkPlan.scala  | 23 +++++++++++++++++++-
 .../PartitioningAwareFileIndex.scala            |  2 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 11 ++++++++++
 .../spark/sql/sources/SimpleTextRelation.scala  |  2 +-
 7 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 3ca9e6a..1fc3a65 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -155,7 +155,7 @@ object ExternalCatalogUtils {
         })
 
       inputPartitions.filter { p =>
-        boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId))
+        boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 760ead4..f8da78b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -27,7 +27,10 @@ import scala.language.existentials
 import scala.util.control.NonFatal
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
-import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, 
SimpleCompiler}
+import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, 
JaninoRuntimeException, SimpleCompiler}
 import org.codehaus.janino.util.ClassFile
 
 import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException}
@@ -899,8 +902,14 @@ object CodeGenerator extends Logging {
   /**
    * Compile the Java source code into a Java class, using Janino.
    */
-  def compile(code: CodeAndComment): GeneratedClass = {
+  def compile(code: CodeAndComment): GeneratedClass = try {
     cache.get(code)
+  } catch {
+    // Cache.get() may wrap the original exception. See the following URL
+    // 
http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/
+    //   Cache.html#get(K,%20java.util.concurrent.Callable)
+    case e @ (_: UncheckedExecutionException | _: ExecutionError) =>
+      throw e.getCause
   }
 
   /**
@@ -951,10 +960,14 @@ object CodeGenerator extends Logging {
       evaluator.cook("generated.java", code.body)
       recordCompilationStats(evaluator)
     } catch {
-      case e: Exception =>
+      case e: JaninoRuntimeException =>
         val msg = s"failed to compile: $e\n$formatted"
         logError(msg, e)
-        throw new Exception(msg, e)
+        throw new JaninoRuntimeException(msg, e)
+      case e: CompileException =>
+        val msg = s"failed to compile: $e\n$formatted"
+        logError(msg, e)
+        throw new CompileException(msg, e.getLocation)
     }
     evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 5034566..c15ee2a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -20,20 +20,22 @@ package org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => 
BasePredicate}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.TypeUtils
 import org.apache.spark.sql.types._
 
 
 object InterpretedPredicate {
-  def create(expression: Expression, inputSchema: Seq[Attribute]): 
(InternalRow => Boolean) =
+  def create(expression: Expression, inputSchema: Seq[Attribute]): 
InterpretedPredicate =
     create(BindReferences.bindReference(expression, inputSchema))
 
-  def create(expression: Expression): (InternalRow => Boolean) = {
-    (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
-  }
+  def create(expression: Expression): InterpretedPredicate = new 
InterpretedPredicate(expression)
 }
 
+case class InterpretedPredicate(expression: Expression) extends BasePredicate {
+  override def eval(r: InternalRow): Boolean = 
expression.eval(r).asInstanceOf[Boolean]
+}
 
 /**
  * An [[Expression]] that returns a boolean value.

http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index cadab37..c4ed966 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -22,6 +22,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
DataInputStream, Da
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.ExecutionContext
 
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.JaninoRuntimeException
+
 import org.apache.spark.{broadcast, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
@@ -353,9 +356,27 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     GenerateMutableProjection.generate(expressions, inputSchema, 
useSubexprElimination)
   }
 
+  private def genInterpretedPredicate(
+      expression: Expression, inputSchema: Seq[Attribute]): 
InterpretedPredicate = {
+    val str = expression.toString
+    val logMessage = if (str.length > 256) {
+      str.substring(0, 256 - 3) + "..."
+    } else {
+      str
+    }
+    logWarning(s"Codegen disabled for this expression:\n $logMessage")
+    InterpretedPredicate.create(expression, inputSchema)
+  }
+
   protected def newPredicate(
       expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = {
-    GeneratePredicate.generate(expression, inputSchema)
+    try {
+      GeneratePredicate.generate(expression, inputSchema)
+    } catch {
+      case e @ (_: JaninoRuntimeException | _: CompileException)
+          if sqlContext == null || sqlContext.conf.wholeStageFallback =>
+        genInterpretedPredicate(expression, inputSchema)
+    }
   }
 
   protected def newOrdering(

http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index ffd7f6c..6b6f638 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -177,7 +177,7 @@ abstract class PartitioningAwareFileIndex(
       })
 
       val selected = partitions.filter {
-        case PartitionPath(values, _) => boundPredicate(values)
+        case PartitionPath(values, _) => boundPredicate.eval(values)
       }
       logInfo {
         val total = partitions.length

http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index b4893b5..e12e396 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1732,4 +1732,15 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
       .filter($"x1".isNotNull || !$"y".isin("a!"))
       .count
   }
+
+  test("SPARK-19372: Filter can be executed w/o generated code due to JVM code 
size limit") {
+    val N = 400
+    val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
+    val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", 
StringType)))
+    val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema)
+
+    val filter = (0 until N)
+      .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= 
"string"))
+    df.filter(filter).count
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/92837aeb/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 9f4009b..60a4638 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -103,7 +103,7 @@ class SimpleTextSource extends TextBasedFileFormat with 
DataSourceRegister {
               // `Cast`ed values are always of internal types (e.g. UTF8String 
instead of String)
               Cast(Literal(value), dataType).eval()
           })
-        }.filter(predicate).map(projection)
+        }.filter(predicate.eval).map(projection)
 
       // Appends partition values
       val fullOutput = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to