[ https://issues.apache.org/jira/browse/SPARK-26680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herman van Hovell resolved SPARK-26680. --------------------------------------- Resolution: Fixed Assignee: Bruce Robbins Fix Version/s: 3.0.0 2.4.1 > StackOverflowError if Stream passed to groupBy > ---------------------------------------------- > > Key: SPARK-26680 > URL: https://issues.apache.org/jira/browse/SPARK-26680 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2, 2.4.0, 3.0.0 > Reporter: Bruce Robbins > Assignee: Bruce Robbins > Priority: Major > Fix For: 2.4.1, 3.0.0 > > > This Java code results in a StackOverflowError: > {code:java} > List<Column> groupByCols = new ArrayList<>(); > groupByCols.add(new Column("id1")); > scala.collection.Seq<Column> groupByColsSeq = > JavaConverters.asScalaIteratorConverter(groupByCols.iterator()) > .asScala().toSeq(); > df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show(); > {code} > The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy > results in the StackOverflowError. In fact, the error can be produced more > easily in spark-shell: > {noformat} > scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv") > df: org.apache.spark.sql.DataFrame = [id1: int, id2: int] > scala> val groupBySeq = Stream(col("id1")) > groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = > Stream(id1, ?) > scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect > java.lang.StackOverflowError > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161) > at scala.collection.immutable.Stream.drop(Stream.scala:797) > at scala.collection.immutable.Stream.drop(Stream.scala:204) > at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66) > at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65) > at scala.collection.immutable.Stream.apply(Stream.scala:204) > at > org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138) > at scala.Option.getOrElse(Option.scala:138) > at > org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159) > at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161) > at scala.collection.immutable.Stream.drop(Stream.scala:797) > at scala.collection.immutable.Stream.drop(Stream.scala:204) > at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66) > at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65) > at scala.collection.immutable.Stream.apply(Stream.scala:204) > at > org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138) > at scala.Option.getOrElse(Option.scala:138) > at > org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159) > ...etc... > {noformat} > This is due to the lazy nature of Streams. The method {{consume}} in > {{CodegenSupport}} assumes that a map function will be eagerly evaluated: > {code:java} > val inputVars = > ctx.currentVars = null <== the closure cares about this > ctx.INPUT_ROW = row > output.zipWithIndex.map { case (attr, i) => > BoundReference(i, attr.dataType, attr.nullable).genCode(ctx) > - > - > - > ctx.currentVars = inputVars > ctx.INPUT_ROW = null > ctx.freshNamePrefix = parent.variablePrefix > val evaluated = evaluateRequiredVariables(output, inputVars, > parent.usedInputs) > {code} > The closure passed to the map function assumes {{ctx.currentVars}} will be > set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to > something else by the time the closure is actually called. Worse yet, > {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The > closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore > it ends up using the data structure it is attempting to create. > You can recreate the problem is a vanilla Scala shell: > {code:java} > scala> var p1: Seq[Any] = null > p1: Seq[Any] = null > scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) > p1(i) else x } > s: scala.collection.immutable.Stream[Any] = Stream(1, ?) > scala> p1 = s > p1: Seq[Any] = Stream(1, ?) > scala> s.foreach(println) > 1 > java.lang.StackOverflowError > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) > at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) > ... etc ... > {code} > Possible fixes: > - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before > passing it to RelationalGroupedDataset (simply by changing > {{cols.map(_.expr)}} to {{cols.toList.map(_.expr)}} > - In {{CodegenSupport.consume}}, we could ensure that the map function is > eagerly evaluated (simply by moving the existing match statement to handle > the result from either path of the if statement). > - Something else that hasn't occurred to me (opinions welcome). -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org