[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruslan Yushchenko updated SPARK-28016:
--------------------------------------
    Attachment: NestedOps.scala

> Spark hangs when an execution plan has many projections on nested structs
> -------------------------------------------------------------------------
>
>                 Key: SPARK-28016
>                 URL: https://issues.apache.org/jira/browse/SPARK-28016
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.4.3
>         Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>            Reporter: Ruslan Yushchenko
>            Priority: Major
>         Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
>     """
>       |{
>       |  "strings": {
>       |    "simple": "Culpa repellat nesciunt accusantium",
>       |    "all_random": "DESebo8d%fL9sX@AzVin",
>       |    "whitespaces": "    q    bb    l    "
>       |  },
>       |  "numerics": {
>       |    "small_positive": 722,
>       |    "small_negative": -660,
>       |    "big_positive": 669223368251997,
>       |    "big_negative": -161176863305841,
>       |    "zero": 0
>       |  }
>       |}
>     """.stripMargin ::
>       """{
>         |  "strings": {
>         |    "simple": "Accusamus quia vel deleniti",
>         |    "all_random": "rY&n9UnVcD*KS]jPBpa[",
>         |    "whitespaces": "  t e   t   rp   z p"
>         |  },
>         |  "numerics": {
>         |    "small_positive": 268,
>         |    "small_negative": -134,
>         |    "big_positive": 768990048149640,
>         |    "big_negative": -684718954884696,
>         |    "zero": 0
>         |  }
>         |}
>         |""".stripMargin ::
>       """{
>         |  "strings": {
>         |    "simple": "Quia numquam deserunt delectus rem est",
>         |    "all_random": "GmRdQlE4Avn1hSlVPAH",
>         |    "whitespaces": "   c   sa    yv   drf "
>         |  },
>         |  "numerics": {
>         |    "small_positive": 909,
>         |    "small_negative": -363,
>         |    "big_positive": 592517494751902,
>         |    "big_negative": -703224505589638,
>         |    "zero": 0
>         |  }
>         |}
>         |""".stripMargin :: Nil
>   /**
>     * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
>     * involving nested structs in an execution plan.
>     *
>     * The example works as follows:
>     * - A small dataframe is created from a JSON example above
>     * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
>     *   a new struct field.
>     * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
>     *   the execution plan
>     */
>   def main(args: Array[String]): Unit = {
>     val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
>     val spark = sparkBuilder
>       .master("local[4]")
>       .getOrCreate()
>     import spark.implicits._
>     import za.co.absa.spark.utils.NestedOps.DataSetWrapper
>     val df = spark.read.json(sample.toDS)
>     // Apply several uppercase and negation transformations
>     val dfOutput = df
>       .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => 
> upper(c))
>       .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => 
> upper(c))
>       .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => 
> upper(c))
>       .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => 
> -c)
>       .nestedWithColumnMap("numerics.small_negative", "numerics.num2", c => 
> -c)
>       .nestedWithColumnMap("numerics.big_positive", "numerics.num3", c => -c)
>       .nestedWithColumnMap("numerics.big_negative", "numerics.num4", c => -c)
>       .nestedWithColumnMap("numerics.small_positive", "numerics.num5", c => 
> -c)
>       .nestedWithColumnMap("numerics.small_negative", "numerics.num6", c => 
> -c)
>       .nestedWithColumnMap("numerics.big_positive", "numerics.num7", c => -c)
>       .nestedWithColumnMap("numerics.big_negative", "numerics.num8", c => -c)
>       // Uncommenting the line below will cause Catalyst to freeze completely
>       //.nestedWithColumnMap("numerics.big_negative", "numerics.num9", c => 
> -c)
>     dfOutput.printSchema()
>     dfOutput.explain(true)
>     dfOutput.show
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to