Ohad Raviv created SPARK-27707:
----------------------------------

             Summary: Performance issue using explode
                 Key: SPARK-27707
                 URL: https://issues.apache.org/jira/browse/SPARK-27707
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.4.3, 3.0.0
            Reporter: Ohad Raviv


this is a corner case of SPARK-21657.
we have a case where we want to explode array inside a struct and also keep 
some other columns of the struct. we again encounter a huge performance issue.
reconstruction code:
{code}
val df = spark.sparkContext.parallelize(Seq(("1",
          Array.fill(M)({
            val i = math.random
            (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
          })))).toDF("col", "arr")
          .selectExpr("col", "struct(col, arr) as st")
          .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")

df.write.mode("overwrite").save("/tmp/blah")
{code}

a workaround is projecting before the explode:
{code}
val df = spark.sparkContext.parallelize(Seq(("1",
          Array.fill(M)({
            val i = math.random
            (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
          })))).toDF("col", "arr")
          .selectExpr("col", "struct(col, arr) as st")
          .withColumn("col1", $"st.col")
          .selectExpr("col", "col1", "explode(st.arr) as arr_col")

df.write.mode("overwrite").save("/tmp/blah")
{code}

in this case the optimization done in SPARK-21657:
{code}
    // prune unrequired references
    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
      val requiredAttrs = p.references -- g.producedAttributes ++ 
g.generator.references
      val newChild = prunedChild(g.child, requiredAttrs)
      val unrequired = g.generator.references -- p.references
      val unrequiredIndices = newChild.output.zipWithIndex.filter(t => 
unrequired.contains(t._1))
        .map(_._2)
      p.copy(child = g.copy(child = newChild, unrequiredChildIndex = 
unrequiredIndices))
{code}

doesn't work because `p.references` has whole the `st` struct as reference and 
not just the projected field.
this causes the entire struct including the huge array field to get duplicated 
as the number of array elements.

I know this is kind of a corner case but was really non trivial to understand..




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to