Bruce Robbins created SPARK-26680: ------------------------------------- Summary: StackOverflowError if Stream passed to groupBy Key: SPARK-26680 URL: https://issues.apache.org/jira/browse/SPARK-26680 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Bruce Robbins
This Java code results in a StackOverflowError: {code:java} List<Column> groupByCols = new ArrayList<>(); groupByCols.add(new Column("id1")); scala.collection.Seq<Column> groupByColsSeq = JavaConverters.asScalaIteratorConverter(groupByCols.iterator()) .asScala().toSeq(); df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show(); {code} The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell: {noformat} scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv") df: org.apache.spark.sql.DataFrame = [id1: int, id2: int] scala> val groupBySeq = Stream(col("id1")) groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?) scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect java.lang.StackOverflowError at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161) at scala.collection.immutable.Stream.drop(Stream.scala:797) at scala.collection.immutable.Stream.drop(Stream.scala:204) at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66) at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65) at scala.collection.immutable.Stream.apply(Stream.scala:204) at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138) at scala.Option.getOrElse(Option.scala:138) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161) at scala.collection.immutable.Stream.drop(Stream.scala:797) at scala.collection.immutable.Stream.drop(Stream.scala:204) at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66) at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65) at scala.collection.immutable.Stream.apply(Stream.scala:204) at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138) at scala.Option.getOrElse(Option.scala:138) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159) ...etc... {noformat} This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated: {code:java} val inputVars = ctx.currentVars = null <== the closure cares about this ctx.INPUT_ROW = row output.zipWithIndex.map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable).genCode(ctx) - - - ctx.currentVars = inputVars ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) {code} The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create. You can recreate the problem is a vanilla Scala shell: {code:java} scala> var p1: Seq[Any] = null p1: Seq[Any] = null scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x } s: scala.collection.immutable.Stream[Any] = Stream(1, ?) scala> p1 = s p1: Seq[Any] = Stream(1, ?) scala> s.foreach(println) 1 java.lang.StackOverflowError at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) ... etc ... {code} Possible fixes: - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(\_.expr)}} - In {{CodegenSupport}}, we could ensure that the map function is eagerly evaluated (simply by adding ".toList" to the construct). - Something else that hasn't occurred to me (opinions welcome). -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org