Repository: spark Updated Branches: refs/heads/master 9150bca47 -> 6f62e9d9b
[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due to 64KB bytecode size limit ## What changes were proposed in this pull request? When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails. This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught. ## How was this patch tested? Add a test suite into `DataFrameSuite` Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Closes #17087 from kiszk/SPARK-19372. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f62e9d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f62e9d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f62e9d9 Branch: refs/heads/master Commit: 6f62e9d9b9fa9b5adb461f0de7f176e4589a4d66 Parents: 9150bca Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Authored: Tue May 16 14:47:21 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue May 16 14:47:21 2017 -0700 ---------------------------------------------------------------------- .../catalyst/catalog/ExternalCatalogUtils.scala | 2 +- .../expressions/codegen/CodeGenerator.scala | 21 ++++++++++++++---- .../sql/catalyst/expressions/predicates.scala | 10 +++++---- .../apache/spark/sql/execution/SparkPlan.scala | 23 +++++++++++++++++++- .../PartitioningAwareFileIndex.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 11 ++++++++++ .../spark/sql/sources/SimpleTextRelation.scala | 2 +- 7 files changed, 59 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 3ca9e6a..1fc3a65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -155,7 +155,7 @@ object ExternalCatalogUtils { }) inputPartitions.filter { p => - boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId)) + boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 760ead4..f8da78b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -27,7 +27,10 @@ import scala.language.existentials import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} -import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.commons.lang3.exception.ExceptionUtils +import org.codehaus.commons.compiler.CompileException +import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler} import org.codehaus.janino.util.ClassFile import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException} @@ -899,8 +902,14 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. */ - def compile(code: CodeAndComment): GeneratedClass = { + def compile(code: CodeAndComment): GeneratedClass = try { cache.get(code) + } catch { + // Cache.get() may wrap the original exception. See the following URL + // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ + // Cache.html#get(K,%20java.util.concurrent.Callable) + case e @ (_: UncheckedExecutionException | _: ExecutionError) => + throw e.getCause } /** @@ -951,10 +960,14 @@ object CodeGenerator extends Logging { evaluator.cook("generated.java", code.body) recordCompilationStats(evaluator) } catch { - case e: Exception => + case e: JaninoRuntimeException => val msg = s"failed to compile: $e\n$formatted" logError(msg, e) - throw new Exception(msg, e) + throw new JaninoRuntimeException(msg, e) + case e: CompileException => + val msg = s"failed to compile: $e\n$formatted" + logError(msg, e) + throw new CompileException(msg, e.getLocation) } evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass] } http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 5034566..c15ee2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -20,20 +20,22 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => BasePredicate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ object InterpretedPredicate { - def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) = + def create(expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = create(BindReferences.bindReference(expression, inputSchema)) - def create(expression: Expression): (InternalRow => Boolean) = { - (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean] - } + def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression) } +case class InterpretedPredicate(expression: Expression) extends BasePredicate { + override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] +} /** * An [[Expression]] that returns a boolean value. http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index cadab37..c4ed966 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -22,6 +22,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext +import org.codehaus.commons.compiler.CompileException +import org.codehaus.janino.JaninoRuntimeException + import org.apache.spark.{broadcast, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec @@ -353,9 +356,27 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination) } + private def genInterpretedPredicate( + expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = { + val str = expression.toString + val logMessage = if (str.length > 256) { + str.substring(0, 256 - 3) + "..." + } else { + str + } + logWarning(s"Codegen disabled for this expression:\n $logMessage") + InterpretedPredicate.create(expression, inputSchema) + } + protected def newPredicate( expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = { - GeneratePredicate.generate(expression, inputSchema) + try { + GeneratePredicate.generate(expression, inputSchema) + } catch { + case e @ (_: JaninoRuntimeException | _: CompileException) + if sqlContext == null || sqlContext.conf.wholeStageFallback => + genInterpretedPredicate(expression, inputSchema) + } } protected def newOrdering( http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ffd7f6c..6b6f638 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -177,7 +177,7 @@ abstract class PartitioningAwareFileIndex( }) val selected = partitions.filter { - case PartitionPath(values, _) => boundPredicate(values) + case PartitionPath(values, _) => boundPredicate.eval(values) } logInfo { val total = partitions.length http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index ef0de6f..2f52192 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1844,4 +1844,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .filter($"x1".isNotNull || !$"y".isin("a!")) .count } + + test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { + val N = 400 + val rows = Seq(Row.fromSeq(Seq.fill(N)("string"))) + val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType))) + val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema) + + val filter = (0 until N) + .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) + df.filter(filter).count + } } http://git-wip-us.apache.org/repos/asf/spark/blob/6f62e9d9/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9f4009b..60a4638 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -103,7 +103,7 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { // `Cast`ed values are always of internal types (e.g. UTF8String instead of String) Cast(Literal(value), dataType).eval() }) - }.filter(predicate).map(projection) + }.filter(predicate.eval).map(projection) // Appends partition values val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org