[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-13 Thread Ruslan Yushchenko (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862898#comment-16862898
 ] 

Ruslan Yushchenko commented on SPARK-28016:
---

Attached a self-contained example ([^SparkApp1IssueSelfContained.scala]). 
Slightly simplified the example, only numeric fields are used now. Also, the 
number of transformations is now easily changeable by changing the upper bound 
of the `Range()`.

This example also reproduces the issue in the current master. On master it does 
not produce a timeout error, just freezes.

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp1IssueSelfContained.scala, SparkApp2Workaround.scala, 
> spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY&n9UnVcD*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation transformations
> val dfOutput = df
>   .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => 
> upp

[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-12 Thread Ruslan Yushchenko (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862752#comment-16862752
 ] 

Ruslan Yushchenko commented on SPARK-28016:
---

Thank you for your quick response.
 * Will make the example self contained.
 * Will run on current master and let you know if I can reproduce it.

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY&n9UnVcD*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation transformations
> val dfOutput = df
>   .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => 
> upper(c))
>   .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => 
> upper(c))
>   .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => 
> upper(c))
>   .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => 

[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862693#comment-16862693
 ] 

Hyukjin Kwon commented on SPARK-28016:
--

Cannot reproduce in the current master.

{code}
scala> dfOutput.show
19/06/13 12:21:08 WARN package: Truncated the string representation of a plan 
since it was too large. This behavior can be adjusted by setting 
'spark.sql.debug.maxToStringFields'.
+++
|numerics| strings|
+++
|[-161176863305841...|[DESebo8d%fL9sX@A...|
|[-684718954884696...|[rY&n9UnVcD*KS]jP...|
|[-703224505589638...|[GmRdQlE4Avn1hSlV...|
+++
{code}

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY&n9UnVcD*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several u

[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

2019-06-12 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862682#comment-16862682
 ] 

Hyukjin Kwon commented on SPARK-28016:
--

Can you narrow down and make a self-contained reproducer without such codes 
like {{DataSetWrapper}}? 

> Spark hangs when an execution plan has many projections on nested structs
> -
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.4.3
> Environment: Tried in
>  * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
>  * Spark 2.4.3 / Yarn on a Linux cluster
>Reporter: Ruslan Yushchenko
>Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, 
> SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) 
> when a logical execution plan contains a lot of projections that operate on 
> nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other 
> demonstrates a workaround. Also, an archive is attached where these jobs are 
> packages as a Maven Project.
> To reproduce the attached Spark App does the following:
>  * A small dataframe is created from a JSON example.
>  * A nested withColumn map transformation is used to apply a transformation 
> on a struct field and create a new struct field. The code for this 
> transformation is also attached. 
>  * Once more than 11 such transformations are applied the Catalyst optimizer 
> freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
>   // A sample data for a dataframe with nested structs
>   val sample =
> """
>   |{
>   |  "strings": {
>   |"simple": "Culpa repellat nesciunt accusantium",
>   |"all_random": "DESebo8d%fL9sX@AzVin",
>   |"whitespaces": "qbbl"
>   |  },
>   |  "numerics": {
>   |"small_positive": 722,
>   |"small_negative": -660,
>   |"big_positive": 669223368251997,
>   |"big_negative": -161176863305841,
>   |"zero": 0
>   |  }
>   |}
> """.stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Accusamus quia vel deleniti",
> |"all_random": "rY&n9UnVcD*KS]jPBpa[",
> |"whitespaces": "  t e   t   rp   z p"
> |  },
> |  "numerics": {
> |"small_positive": 268,
> |"small_negative": -134,
> |"big_positive": 768990048149640,
> |"big_negative": -684718954884696,
> |"zero": 0
> |  }
> |}
> |""".stripMargin ::
>   """{
> |  "strings": {
> |"simple": "Quia numquam deserunt delectus rem est",
> |"all_random": "GmRdQlE4Avn1hSlVPAH",
> |"whitespaces": "   c   sayv   drf "
> |  },
> |  "numerics": {
> |"small_positive": 909,
> |"small_negative": -363,
> |"big_positive": 592517494751902,
> |"big_negative": -703224505589638,
> |"zero": 0
> |  }
> |}
> |""".stripMargin :: Nil
>   /**
> * This Spark Job demonstrates an issue of execution plan freezing when 
> there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a 
> transformation on a struct field and create
> *   a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst 
> optimizer freezes on optimizing
> *   the execution plan
> */
>   def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections 
> Issue")
> val spark = sparkBuilder
>   .master("local[4]")
>   .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation transformations
> val dfOutput = df
>   .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => 
> upper(c))
>   .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => 
> upper(c))
>   .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => 
> upper(c))
>   .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => 
> -c)
>   .nestedWithColumnMap("numerics.small_n