Bruce Robbins created SPARK-26680:
-------------------------------------

             Summary: StackOverflowError if Stream passed to groupBy
                 Key: SPARK-26680
                 URL: https://issues.apache.org/jira/browse/SPARK-26680
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.0
            Reporter: Bruce Robbins


This Java code results in a StackOverflowError:
{code:java}
List<Column> groupByCols = new ArrayList<>();
groupByCols.add(new Column("id1"));
scala.collection.Seq<Column> groupByColsSeq =
    JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
        .asScala().toSeq();
df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
{code}
The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy 
results in the StackOverflowError. In fact, the error can be produced more 
easily in spark-shell:
{noformat}
scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> val groupBySeq = Stream(col("id1"))
groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = 
Stream(id1, ?)
scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
java.lang.StackOverflowError
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
  at scala.collection.immutable.Stream.drop(Stream.scala:797)
  at scala.collection.immutable.Stream.drop(Stream.scala:204)
  at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
  at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.Stream.apply(Stream.scala:204)
  at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
  at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
  at scala.Option.getOrElse(Option.scala:138)
  at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
  at 
org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
  at scala.collection.immutable.Stream.drop(Stream.scala:797)
  at scala.collection.immutable.Stream.drop(Stream.scala:204)
  at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
  at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.Stream.apply(Stream.scala:204)
  at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
  at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
  at scala.Option.getOrElse(Option.scala:138)
  at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
  at 
org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
...etc...
{noformat}
This is due to the lazy nature of Streams. The method {{consume}} in 
{{CodegenSupport}} assumes that a map function will be eagerly evaluated:
{code:java}
val inputVars =
        ctx.currentVars = null <== the closure cares about this
        ctx.INPUT_ROW = row
        output.zipWithIndex.map { case (attr, i) =>
          BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
-
-
-
    ctx.currentVars = inputVars
    ctx.INPUT_ROW = null
    ctx.freshNamePrefix = parent.variablePrefix
    val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
{code}
The closure passed to the map function assumes {{ctx.currentVars}} will be set 
to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something 
else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} 
is set to the yet-to-be evaluated inputVars stream. The closure uses 
{{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using 
the data structure it is attempting to create.

You can recreate the problem is a vanilla Scala shell:
{code:java}
scala> var p1: Seq[Any] = null
p1: Seq[Any] = null
scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) 
p1(i) else x }
s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
scala> p1 = s
p1: Seq[Any] = Stream(1, ?)
scala> s.foreach(println)
1
java.lang.StackOverflowError
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
... etc ...
{code}
Possible fixes:
 - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before 
passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} 
to {{cols.toList.map(\_.expr)}}
 - In {{CodegenSupport}}, we could ensure that the map function is eagerly 
evaluated (simply by adding ".toList" to the construct).
 - Something else that hasn't occurred to me (opinions welcome).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to