[ https://issues.apache.org/jira/browse/SPARK-28090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876145#comment-16876145 ]
Iskender Unlu commented on SPARK-28090: --------------------------------------- I will work on this issue. > Spark hangs when an execution plan has many projections on nested structs > ------------------------------------------------------------------------- > > Key: SPARK-28090 > URL: https://issues.apache.org/jira/browse/SPARK-28090 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.4.3 > Environment: Tried in > * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows > * Spark 2.4.3 / Yarn on a Linux cluster > Reporter: Ruslan Yushchenko > Priority: Major > > This was already posted (#28016), but the provided example didn't always > reproduce the error. This example consistently reproduces the issue. > Spark applications freeze on execution plan optimization stage (Catalyst) > when a logical execution plan contains a lot of projections that operate on > nested struct fields. > The code listed below demonstrates the issue. > To reproduce the Spark App does the following: > * A small dataframe is created from a JSON example. > * Several nested transformations (negation of a number) are applied on > struct fields and each time a new struct field is created. > * Once more than 9 such transformations are applied the Catalyst optimizer > freezes on optimizing the execution plan. > * You can control the freezing by choosing different upper bound for the > Range. E.g. it will work file if the upper bound is 5, but will hang is the > bound is 10. > {code:java} > package com.example > import org.apache.spark.sql._ > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.types.{StructField, StructType} > import scala.collection.mutable.ListBuffer > object SparkApp1IssueSelfContained { > // A sample data for a dataframe with nested structs > val sample: List[String] = > """ { "numerics": {"num1": 101, "num2": 102, "num3": 103, "num4": 104, > "num5": 105, "num6": 106, "num7": 107, "num8": 108, "num9": 109, "num10": > 110, "num11": 111, "num12": 112, "num13": 113, "num14": 114, "num15": 115} } > """ :: > """ { "numerics": {"num1": 201, "num2": 202, "num3": 203, "num4": 204, > "num5": 205, "num6": 206, "num7": 207, "num8": 208, "num9": 209, "num10": > 210, "num11": 211, "num12": 212, "num13": 213, "num14": 214, "num15": 215} } > """ :: > """ { "numerics": {"num1": 301, "num2": 302, "num3": 303, "num4": 304, > "num5": 305, "num6": 306, "num7": 307, "num8": 308, "num9": 309, "num10": > 310, "num11": 311, "num12": 312, "num13": 313, "num14": 314, "num15": 315} } > """ :: > Nil > /** > * Transforms a column inside a nested struct. The transformed value will > be put into a new field of that nested struct > * > * The output column name can omit the full path as the field will be > created at the same level of nesting as the input column. > * > * @param inputColumnName A column name for which to apply the > transformation, e.g. `company.employee.firstName`. > * @param outputColumnName The output column name. The path is optional, > e.g. you can use `transformedName` instead of > `company.employee.transformedName`. > * @param expression A function that applies a transformation to a > column as a Spark expression. > * @return A dataframe with a new field that contains transformed values. > */ > def transformInsideNestedStruct(df: DataFrame, > inputColumnName: String, > outputColumnName: String, > expression: Column => Column): DataFrame = { > def mapStruct(schema: StructType, path: Seq[String], parentColumn: > Option[Column] = None): Seq[Column] = { > val mappedFields = new ListBuffer[Column]() > def handleMatchedLeaf(field: StructField, curColumn: Column): > Seq[Column] = { > val newColumn = expression(curColumn).as(outputColumnName) > mappedFields += newColumn > Seq(curColumn) > } > def handleMatchedNonLeaf(field: StructField, curColumn: Column): > Seq[Column] = { > // Non-leaf columns need to be further processed recursively > field.dataType match { > case dt: StructType => Seq(struct(mapStruct(dt, path.tail, > Some(curColumn)): _*).as(field.name)) > case _ => throw new IllegalArgumentException(s"Field > '${field.name}' is not a struct type.") > } > } > val fieldName = path.head > val isLeaf = path.lengthCompare(2) < 0 > val newColumns = schema.fields.flatMap(field => { > // This is the original column (struct field) we want to process > val curColumn = parentColumn match { > case None => new Column(field.name) > case Some(col) => col.getField(field.name).as(field.name) > } > if (field.name.compareToIgnoreCase(fieldName) != 0) { > // Copy unrelated fields as they were > Seq(curColumn) > } else { > // We have found a match > if (isLeaf) { > handleMatchedLeaf(field, curColumn) > } else { > handleMatchedNonLeaf(field, curColumn) > } > } > }) > newColumns ++ mappedFields > } > val schema = df.schema > val path = inputColumnName.split('.') > df.select(mapStruct(schema, path): _*) > } > /** > * This Spark Job demonstrates an issue of execution plan freezing when > there are a lot of projections > * involving nested structs in an execution plan. > * > * The example works as follows: > * - A small dataframe is created from a JSON example above > * - A nested withColumn map transformation is used to apply a > transformation on a struct field and create > * a new struct field. > * - Once more than 9 such transformations are applied the Catalyst > optimizer freezes on optimizing > * the execution plan > * > */ > def main(args: Array[String]): Unit = { > val sparkBuilder = SparkSession.builder().appName("Nested Projections > Issue Example") > val spark = sparkBuilder > .master("local[4]") > .getOrCreate() > import spark.implicits._ > val dfInput = spark.read.json(sample.toDS) > // Apply several negations. You can change the number of negations by > changing the upper bound of the range up to 16 > val dfOutput = Range(1,12).foldLeft(dfInput)( (df, i) => { > transformInsideNestedStruct(df, s"numerics.num$i", s"out_num$i", c => > -c) > }) > dfOutput.printSchema() > dfOutput.explain(true) > dfOutput.show(false) > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org