Repository: spark Updated Branches: refs/heads/master ae61f187a -> 83488cc31
[SPARK-21871][SQL] Fix infinite loop when bytecode size is larger than spark.sql.codegen.hugeMethodLimit ## What changes were proposed in this pull request? When exceeding `spark.sql.codegen.hugeMethodLimit`, the runtime fallbacks to the Volcano iterator solution. This could cause an infinite loop when `FileSourceScanExec` can use the columnar batch to read the data. This PR is to fix the issue. ## How was this patch tested? Added a test Author: gatorsmile <gatorsm...@gmail.com> Closes #19440 from gatorsmile/testt. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83488cc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83488cc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83488cc3 Branch: refs/heads/master Commit: 83488cc3180ca18f829516f550766efb3095881e Parents: ae61f18 Author: gatorsmile <gatorsm...@gmail.com> Authored: Thu Oct 5 23:33:49 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Oct 5 23:33:49 2017 -0700 ---------------------------------------------------------------------- .../sql/execution/WholeStageCodegenExec.scala | 12 ++++++---- .../sql/execution/WholeStageCodegenSuite.scala | 23 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/83488cc3/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 9073d59..1aaaf89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co // Check if compiled code has a too large function if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { - logWarning(s"Found too long generated codes and JIT optimization might not work: " + - s"the bytecode size was $maxCodeSize, this value went over the limit " + + logInfo(s"Found too long generated codes and JIT optimization might not work: " + + s"the bytecode size ($maxCodeSize) is above the limit " + s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + s"for this plan. To avoid this, you can raise the limit " + - s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString") - return child.execute() + s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") + child match { + // The fallback solution of batch file source scan still uses WholeStageCodegenExec + case f: FileSourceScanExec if f.supportsBatch => // do nothing + case _ => return child.execute() + } } val references = ctx.references.toArray http://git-wip-us.apache.org/repos/asf/spark/blob/83488cc3/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index aaa77b3..098e4cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.Row +import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} -class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { +class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { test("range/filter should be combined") { val df = spark.range(10).filter("id = 1").selectExpr("id + 1") @@ -185,4 +185,23 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } + + test("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") { + import testImplicits._ + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202", + SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") { + // wide table batch scan causes the byte code of codegen exceeds the limit of + // WHOLESTAGE_HUGE_METHOD_LIMIT + val df2 = spark.read.parquet(path) + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch) + checkAnswer(df2, df) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org