[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16223902#comment-16223902
 ] 

Ohad Raviv commented on SPARK-21657:
------------------------------------

I Switched to toArray instead of toList in the above code and I did get an 
improvement by factor of 2. but we still remain with the main bottleneck.
now the diff in the above example between:
{code:java}
val df_exploded = df.select(expr("c1"), explode($"c_arr").as("c2"))
{code}
and:
{code:java}
val df_exploded = df.select(explode($"c_arr").as("c2"))
{code}
is 128 secs vs. 3 secs.

Again I profiled the former and saw that all the time got consumed in:
org.apache.spark.unsafe.Platform.copyMemory()   97.548096       23,991 ms 
(97.5%)       

the obvious diff between the execution plans is that the former has two 
WholeStageCodeGen plans and the later just one.
I didn't exactly understood the generated code but I would guess that what 
happens is that in the problematic case the generated explode code is actually 
multiplying the long array to all the exploded rows and only filters it in the 
end.
Please see if you can verify it or think on a workaround for it.



> Spark has exponential time complexity to explode(array of structs)
> ------------------------------------------------------------------
>
>                 Key: SPARK-21657
>                 URL: https://issues.apache.org/jira/browse/SPARK-21657
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>            Reporter: Ruslan Dautkhanov
>              Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
>         Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to