[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16223946#comment-16223946 ]
Ohad Raviv commented on SPARK-21657: ------------------------------------ Sure, the plan for {code:java} val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2")).selectExpr("c1" ,"c2.*") {code} is {noformat} == Parsed Logical Plan == 'Project [unresolvedalias('c1, None), ArrayBuffer(c2).*] +- Project [c1#6, c2#25] +- Generate explode(c_arr#7), true, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4] +- ExternalRDD [obj#2] == Analyzed Logical Plan == c1: string, _1: string, _2: string, _3: string, _4: string Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, c2#25._4 AS _4#43] +- Project [c1#6, c2#25] +- Generate explode(c_arr#7), true, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, c2#25._4 AS _4#43] +- Generate explode(c_arr#7), true, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, c2#25._4 AS _4#43] +- Generate explode(c_arr#7), true, false, [c2#25] +- *Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4] +- Scan ExternalRDDScan[obj#2] {noformat} and for: {code:java} val df_exploded = df.select(explode($"c_arr").as("c2")).selectExpr("c2.*") {code} is {noformat} == Parsed Logical Plan == 'Project [ArrayBuffer(c2).*] +- Project [c2#25] +- Generate explode(c_arr#7), false, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4] +- ExternalRDD [obj#2] == Analyzed Logical Plan == _1: string, _2: string, _3: string, _4: string Project [c2#25._1 AS _1#38, c2#25._2 AS _2#39, c2#25._3 AS _3#40, c2#25._4 AS _4#41] +- Project [c2#25] +- Generate explode(c_arr#7), false, false, [c2#25] +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [c2#25._1 AS _1#38, c2#25._2 AS _2#39, c2#25._3 AS _3#40, c2#25._4 AS _4#41] +- Generate explode(c_arr#7), false, false, [c2#25] +- Project [_2#4 AS c_arr#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *Project [c2#25._1 AS _1#38, c2#25._2 AS _2#39, c2#25._3 AS _3#40, c2#25._4 AS _4#41] +- Generate explode(c_arr#7), false, false, [c2#25] +- *Project [_2#4 AS c_arr#7] +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4] +- Scan ExternalRDDScan[obj#2] {noformat} > Spark has exponential time complexity to explode(array of structs) > ------------------------------------------------------------------ > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0 > Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sized nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling of 50,000 (see attached pyspark script), it took 7 hours to > explode the nested collections (\!) of 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org