[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs
[ https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862898#comment-16862898 ] Ruslan Yushchenko commented on SPARK-28016: --- Attached a self-contained example ([^SparkApp1IssueSelfContained.scala]). Slightly simplified the example, only numeric fields are used now. Also, the number of transformations is now easily changeable by changing the upper bound of the `Range()`. This example also reproduces the issue in the current master. On master it does not produce a timeout error, just freezes. > Spark hangs when an execution plan has many projections on nested structs > - > > Key: SPARK-28016 > URL: https://issues.apache.org/jira/browse/SPARK-28016 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.4.3 > Environment: Tried in > * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows > * Spark 2.4.3 / Yarn on a Linux cluster >Reporter: Ruslan Yushchenko >Priority: Major > Attachments: NestedOps.scala, SparkApp1Issue.scala, > SparkApp1IssueSelfContained.scala, SparkApp2Workaround.scala, > spark-app-nested.tgz > > > Spark applications freeze on execution plan optimization stage (Catalyst) > when a logical execution plan contains a lot of projections that operate on > nested struct fields. > 2 Spark Applications are attached. One demonstrates the issue, the other > demonstrates a workaround. Also, an archive is attached where these jobs are > packages as a Maven Project. > To reproduce the attached Spark App does the following: > * A small dataframe is created from a JSON example. > * A nested withColumn map transformation is used to apply a transformation > on a struct field and create a new struct field. The code for this > transformation is also attached. > * Once more than 11 such transformations are applied the Catalyst optimizer > freezes on optimizing the execution plan > {code:scala} > package za.co.absa.spark.app > import org.apache.spark.sql._ > import org.apache.spark.sql.functions._ > object SparkApp1Issue { > // A sample data for a dataframe with nested structs > val sample = > """ > |{ > | "strings": { > |"simple": "Culpa repellat nesciunt accusantium", > |"all_random": "DESebo8d%fL9sX@AzVin", > |"whitespaces": "qbbl" > | }, > | "numerics": { > |"small_positive": 722, > |"small_negative": -660, > |"big_positive": 669223368251997, > |"big_negative": -161176863305841, > |"zero": 0 > | } > |} > """.stripMargin :: > """{ > | "strings": { > |"simple": "Accusamus quia vel deleniti", > |"all_random": "rY&n9UnVcD*KS]jPBpa[", > |"whitespaces": " t e t rp z p" > | }, > | "numerics": { > |"small_positive": 268, > |"small_negative": -134, > |"big_positive": 768990048149640, > |"big_negative": -684718954884696, > |"zero": 0 > | } > |} > |""".stripMargin :: > """{ > | "strings": { > |"simple": "Quia numquam deserunt delectus rem est", > |"all_random": "GmRdQlE4Avn1hSlVPAH", > |"whitespaces": " c sayv drf " > | }, > | "numerics": { > |"small_positive": 909, > |"small_negative": -363, > |"big_positive": 592517494751902, > |"big_negative": -703224505589638, > |"zero": 0 > | } > |} > |""".stripMargin :: Nil > /** > * This Spark Job demonstrates an issue of execution plan freezing when > there are a lot of projections > * involving nested structs in an execution plan. > * > * The example works as follows: > * - A small dataframe is created from a JSON example above > * - A nested withColumn map transformation is used to apply a > transformation on a struct field and create > * a new struct field. > * - Once more than 11 such transformations are applied the Catalyst > optimizer freezes on optimizing > * the execution plan > */ > def main(args: Array[String]): Unit = { > val sparkBuilder = SparkSession.builder().appName("Nested Projections > Issue") > val spark = sparkBuilder > .master("local[4]") > .getOrCreate() > import spark.implicits._ > import za.co.absa.spark.utils.NestedOps.DataSetWrapper > val df = spark.read.json(sample.toDS) > // Apply several uppercase and negation transformations > val dfOutput = df > .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => > upp
[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs
[ https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862752#comment-16862752 ] Ruslan Yushchenko commented on SPARK-28016: --- Thank you for your quick response. * Will make the example self contained. * Will run on current master and let you know if I can reproduce it. > Spark hangs when an execution plan has many projections on nested structs > - > > Key: SPARK-28016 > URL: https://issues.apache.org/jira/browse/SPARK-28016 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.4.3 > Environment: Tried in > * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows > * Spark 2.4.3 / Yarn on a Linux cluster >Reporter: Ruslan Yushchenko >Priority: Major > Attachments: NestedOps.scala, SparkApp1Issue.scala, > SparkApp2Workaround.scala, spark-app-nested.tgz > > > Spark applications freeze on execution plan optimization stage (Catalyst) > when a logical execution plan contains a lot of projections that operate on > nested struct fields. > 2 Spark Applications are attached. One demonstrates the issue, the other > demonstrates a workaround. Also, an archive is attached where these jobs are > packages as a Maven Project. > To reproduce the attached Spark App does the following: > * A small dataframe is created from a JSON example. > * A nested withColumn map transformation is used to apply a transformation > on a struct field and create a new struct field. The code for this > transformation is also attached. > * Once more than 11 such transformations are applied the Catalyst optimizer > freezes on optimizing the execution plan > {code:scala} > package za.co.absa.spark.app > import org.apache.spark.sql._ > import org.apache.spark.sql.functions._ > object SparkApp1Issue { > // A sample data for a dataframe with nested structs > val sample = > """ > |{ > | "strings": { > |"simple": "Culpa repellat nesciunt accusantium", > |"all_random": "DESebo8d%fL9sX@AzVin", > |"whitespaces": "qbbl" > | }, > | "numerics": { > |"small_positive": 722, > |"small_negative": -660, > |"big_positive": 669223368251997, > |"big_negative": -161176863305841, > |"zero": 0 > | } > |} > """.stripMargin :: > """{ > | "strings": { > |"simple": "Accusamus quia vel deleniti", > |"all_random": "rY&n9UnVcD*KS]jPBpa[", > |"whitespaces": " t e t rp z p" > | }, > | "numerics": { > |"small_positive": 268, > |"small_negative": -134, > |"big_positive": 768990048149640, > |"big_negative": -684718954884696, > |"zero": 0 > | } > |} > |""".stripMargin :: > """{ > | "strings": { > |"simple": "Quia numquam deserunt delectus rem est", > |"all_random": "GmRdQlE4Avn1hSlVPAH", > |"whitespaces": " c sayv drf " > | }, > | "numerics": { > |"small_positive": 909, > |"small_negative": -363, > |"big_positive": 592517494751902, > |"big_negative": -703224505589638, > |"zero": 0 > | } > |} > |""".stripMargin :: Nil > /** > * This Spark Job demonstrates an issue of execution plan freezing when > there are a lot of projections > * involving nested structs in an execution plan. > * > * The example works as follows: > * - A small dataframe is created from a JSON example above > * - A nested withColumn map transformation is used to apply a > transformation on a struct field and create > * a new struct field. > * - Once more than 11 such transformations are applied the Catalyst > optimizer freezes on optimizing > * the execution plan > */ > def main(args: Array[String]): Unit = { > val sparkBuilder = SparkSession.builder().appName("Nested Projections > Issue") > val spark = sparkBuilder > .master("local[4]") > .getOrCreate() > import spark.implicits._ > import za.co.absa.spark.utils.NestedOps.DataSetWrapper > val df = spark.read.json(sample.toDS) > // Apply several uppercase and negation transformations > val dfOutput = df > .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => > upper(c)) > .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => > upper(c)) > .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => > upper(c)) > .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c =>
[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs
[ https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862693#comment-16862693 ] Hyukjin Kwon commented on SPARK-28016: -- Cannot reproduce in the current master. {code} scala> dfOutput.show 19/06/13 12:21:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. +++ |numerics| strings| +++ |[-161176863305841...|[DESebo8d%fL9sX@A...| |[-684718954884696...|[rY&n9UnVcD*KS]jP...| |[-703224505589638...|[GmRdQlE4Avn1hSlV...| +++ {code} > Spark hangs when an execution plan has many projections on nested structs > - > > Key: SPARK-28016 > URL: https://issues.apache.org/jira/browse/SPARK-28016 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.4.3 > Environment: Tried in > * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows > * Spark 2.4.3 / Yarn on a Linux cluster >Reporter: Ruslan Yushchenko >Priority: Major > Attachments: NestedOps.scala, SparkApp1Issue.scala, > SparkApp2Workaround.scala, spark-app-nested.tgz > > > Spark applications freeze on execution plan optimization stage (Catalyst) > when a logical execution plan contains a lot of projections that operate on > nested struct fields. > 2 Spark Applications are attached. One demonstrates the issue, the other > demonstrates a workaround. Also, an archive is attached where these jobs are > packages as a Maven Project. > To reproduce the attached Spark App does the following: > * A small dataframe is created from a JSON example. > * A nested withColumn map transformation is used to apply a transformation > on a struct field and create a new struct field. The code for this > transformation is also attached. > * Once more than 11 such transformations are applied the Catalyst optimizer > freezes on optimizing the execution plan > {code:scala} > package za.co.absa.spark.app > import org.apache.spark.sql._ > import org.apache.spark.sql.functions._ > object SparkApp1Issue { > // A sample data for a dataframe with nested structs > val sample = > """ > |{ > | "strings": { > |"simple": "Culpa repellat nesciunt accusantium", > |"all_random": "DESebo8d%fL9sX@AzVin", > |"whitespaces": "qbbl" > | }, > | "numerics": { > |"small_positive": 722, > |"small_negative": -660, > |"big_positive": 669223368251997, > |"big_negative": -161176863305841, > |"zero": 0 > | } > |} > """.stripMargin :: > """{ > | "strings": { > |"simple": "Accusamus quia vel deleniti", > |"all_random": "rY&n9UnVcD*KS]jPBpa[", > |"whitespaces": " t e t rp z p" > | }, > | "numerics": { > |"small_positive": 268, > |"small_negative": -134, > |"big_positive": 768990048149640, > |"big_negative": -684718954884696, > |"zero": 0 > | } > |} > |""".stripMargin :: > """{ > | "strings": { > |"simple": "Quia numquam deserunt delectus rem est", > |"all_random": "GmRdQlE4Avn1hSlVPAH", > |"whitespaces": " c sayv drf " > | }, > | "numerics": { > |"small_positive": 909, > |"small_negative": -363, > |"big_positive": 592517494751902, > |"big_negative": -703224505589638, > |"zero": 0 > | } > |} > |""".stripMargin :: Nil > /** > * This Spark Job demonstrates an issue of execution plan freezing when > there are a lot of projections > * involving nested structs in an execution plan. > * > * The example works as follows: > * - A small dataframe is created from a JSON example above > * - A nested withColumn map transformation is used to apply a > transformation on a struct field and create > * a new struct field. > * - Once more than 11 such transformations are applied the Catalyst > optimizer freezes on optimizing > * the execution plan > */ > def main(args: Array[String]): Unit = { > val sparkBuilder = SparkSession.builder().appName("Nested Projections > Issue") > val spark = sparkBuilder > .master("local[4]") > .getOrCreate() > import spark.implicits._ > import za.co.absa.spark.utils.NestedOps.DataSetWrapper > val df = spark.read.json(sample.toDS) > // Apply several u
[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs
[ https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862682#comment-16862682 ] Hyukjin Kwon commented on SPARK-28016: -- Can you narrow down and make a self-contained reproducer without such codes like {{DataSetWrapper}}? > Spark hangs when an execution plan has many projections on nested structs > - > > Key: SPARK-28016 > URL: https://issues.apache.org/jira/browse/SPARK-28016 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.4.3 > Environment: Tried in > * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows > * Spark 2.4.3 / Yarn on a Linux cluster >Reporter: Ruslan Yushchenko >Priority: Major > Attachments: NestedOps.scala, SparkApp1Issue.scala, > SparkApp2Workaround.scala, spark-app-nested.tgz > > > Spark applications freeze on execution plan optimization stage (Catalyst) > when a logical execution plan contains a lot of projections that operate on > nested struct fields. > 2 Spark Applications are attached. One demonstrates the issue, the other > demonstrates a workaround. Also, an archive is attached where these jobs are > packages as a Maven Project. > To reproduce the attached Spark App does the following: > * A small dataframe is created from a JSON example. > * A nested withColumn map transformation is used to apply a transformation > on a struct field and create a new struct field. The code for this > transformation is also attached. > * Once more than 11 such transformations are applied the Catalyst optimizer > freezes on optimizing the execution plan > {code:scala} > package za.co.absa.spark.app > import org.apache.spark.sql._ > import org.apache.spark.sql.functions._ > object SparkApp1Issue { > // A sample data for a dataframe with nested structs > val sample = > """ > |{ > | "strings": { > |"simple": "Culpa repellat nesciunt accusantium", > |"all_random": "DESebo8d%fL9sX@AzVin", > |"whitespaces": "qbbl" > | }, > | "numerics": { > |"small_positive": 722, > |"small_negative": -660, > |"big_positive": 669223368251997, > |"big_negative": -161176863305841, > |"zero": 0 > | } > |} > """.stripMargin :: > """{ > | "strings": { > |"simple": "Accusamus quia vel deleniti", > |"all_random": "rY&n9UnVcD*KS]jPBpa[", > |"whitespaces": " t e t rp z p" > | }, > | "numerics": { > |"small_positive": 268, > |"small_negative": -134, > |"big_positive": 768990048149640, > |"big_negative": -684718954884696, > |"zero": 0 > | } > |} > |""".stripMargin :: > """{ > | "strings": { > |"simple": "Quia numquam deserunt delectus rem est", > |"all_random": "GmRdQlE4Avn1hSlVPAH", > |"whitespaces": " c sayv drf " > | }, > | "numerics": { > |"small_positive": 909, > |"small_negative": -363, > |"big_positive": 592517494751902, > |"big_negative": -703224505589638, > |"zero": 0 > | } > |} > |""".stripMargin :: Nil > /** > * This Spark Job demonstrates an issue of execution plan freezing when > there are a lot of projections > * involving nested structs in an execution plan. > * > * The example works as follows: > * - A small dataframe is created from a JSON example above > * - A nested withColumn map transformation is used to apply a > transformation on a struct field and create > * a new struct field. > * - Once more than 11 such transformations are applied the Catalyst > optimizer freezes on optimizing > * the execution plan > */ > def main(args: Array[String]): Unit = { > val sparkBuilder = SparkSession.builder().appName("Nested Projections > Issue") > val spark = sparkBuilder > .master("local[4]") > .getOrCreate() > import spark.implicits._ > import za.co.absa.spark.utils.NestedOps.DataSetWrapper > val df = spark.read.json(sample.toDS) > // Apply several uppercase and negation transformations > val dfOutput = df > .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => > upper(c)) > .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => > upper(c)) > .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => > upper(c)) > .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => > -c) > .nestedWithColumnMap("numerics.small_n