[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16223946#comment-16223946
 ] 

Ohad Raviv commented on SPARK-21657:
------------------------------------

Sure,
the plan for
{code:java}
val df_exploded = df.select(expr("c1"), 
explode($"c_arr").as("c2")).selectExpr("c1" ,"c2.*")
{code}
is 
{noformat}
== Parsed Logical Plan ==
'Project [unresolvedalias('c1, None), ArrayBuffer(c2).*]
+- Project [c1#6, c2#25]
   +- Generate explode(c_arr#7), true, false, [c2#25]
      +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
         +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4]
            +- ExternalRDD [obj#2]

== Analyzed Logical Plan ==
c1: string, _1: string, _2: string, _3: string, _4: string
Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, 
c2#25._4 AS _4#43]
+- Project [c1#6, c2#25]
   +- Generate explode(c_arr#7), true, false, [c2#25]
      +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
         +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4]
            +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, 
c2#25._4 AS _4#43]
+- Generate explode(c_arr#7), true, false, [c2#25]
   +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
      +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4]
         +- ExternalRDD [obj#2]

== Physical Plan ==
*Project [c1#6, c2#25._1 AS _1#40, c2#25._2 AS _2#41, c2#25._3 AS _3#42, 
c2#25._4 AS _4#43]
+- Generate explode(c_arr#7), true, false, [c2#25]
   +- *Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
      +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4]
         +- Scan ExternalRDDScan[obj#2]
{noformat}
and for:
{code:java}
val df_exploded = df.select(explode($"c_arr").as("c2")).selectExpr("c2.*")
{code}
is 
{noformat}
== Parsed Logical Plan ==
'Project [ArrayBuffer(c2).*]
+- Project [c2#25]
   +- Generate explode(c_arr#7), false, false, [c2#25]
      +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
         +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4]
            +- ExternalRDD [obj#2]

== Analyzed Logical Plan ==
_1: string, _2: string, _3: string, _4: string
Project [c2#25._1 AS _1#38, c2#25._2 AS _2#39, c2#25._3 AS _3#40, c2#25._4 AS 
_4#41]
+- Project [c2#25]
   +- Generate explode(c_arr#7), false, false, [c2#25]
      +- Project [_1#3 AS c1#6, _2#4 AS c_arr#7]
         +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#4]
            +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [c2#25._1 AS _1#38, c2#25._2 AS _2#39, c2#25._3 AS _3#40, c2#25._4 AS 
_4#41]
+- Generate explode(c_arr#7), false, false, [c2#25]
   +- Project [_2#4 AS c_arr#7]
      +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4]
         +- ExternalRDD [obj#2]

== Physical Plan ==
*Project [c2#25._1 AS _1#38, c2#25._2 AS _2#39, c2#25._3 AS _3#40, c2#25._4 AS 
_4#41]
+- Generate explode(c_arr#7), false, false, [c2#25]
   +- *Project [_2#4 AS c_arr#7]
      +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, 
mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class 
scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))) null else 
named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._1, true), _2, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._2, true), _3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._3, true), _4, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(lambdavariable(MapObjects_loopValue0, 
MapObjects_loopIsNull0, ObjectType(class scala.Tuple4), true))._4, true)), 
assertnotnull(input[0, scala.Tuple2, true])._2, None) AS _2#4]
         +- Scan ExternalRDDScan[obj#2]
{noformat}

> Spark has exponential time complexity to explode(array of structs)
> ------------------------------------------------------------------
>
>                 Key: SPARK-21657
>                 URL: https://issues.apache.org/jira/browse/SPARK-21657
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>            Reporter: Ruslan Dautkhanov
>              Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
>         Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to