[ 
https://issues.apache.org/jira/browse/SPARK-18367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Chammas updated SPARK-18367:
-------------------------------------
    Description: 
I have a moderately complex DataFrame query that spawns north of 10K open 
files, causing my job to crash. 10K is the macOS limit on how many files a 
single process can have open at once. It seems unreasonable that Spark should 
hold that many files open at once.

I was able to boil down what I'm seeing to the following minimal reproduction:

{code}
import pyspark
from pyspark.sql import Row


if __name__ == '__main__':
    spark = pyspark.sql.SparkSession.builder.getOrCreate()

    df = spark.createDataFrame([
        Row(a=n)
        for n in range(500000)
    ]).coalesce(1)  # a coalesce(1) here "fixes" the problem
    df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help

    print('parititons:', df.rdd.getNumPartitions())
    df.explain()
    df.show(1)
{code}

When I run this code, Spark spawns over 2K open files. I can "fix" the problem 
by adding a {{coalesce(1)}} in the right place, as indicated in the comments 
above. When I do, Spark spawns no more than 600 open files. The number of 
partitions without the coalesce is 200.

Here are the execution plans with and without the coalesce.

2K+ open files, without the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#3L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#0L, 200)
   :     +- *Filter isnotnull(a#0L)
   :        +- Scan ExistingRDD[a#0L]
   +- *Sort [a#3L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#3L, 200)
         +- *Filter isnotnull(a#3L)
            +- Scan ExistingRDD[a#3L]
{code}

<600 open files, with the coalesce:
{code}
== Physical Plan ==
*Project [a#0L]
+- *SortMergeJoin [a#0L], [a#4L], Inner
   :- *Sort [a#0L ASC NULLS FIRST], false, 0
   :  +- Coalesce 1
   :     +- *Filter isnotnull(a#0L)
   :        +- Scan ExistingRDD[a#0L]
   +- *Sort [a#4L ASC NULLS FIRST], false, 0
      +- Coalesce 1
         +- *Filter isnotnull(a#4L)
            +- Scan ExistingRDD[a#4L]
{code}

So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
200)}} operator.

Is the large number of open files perhaps expected given the join on a large 
number of distinct keys? If so, how would one mitigate that issue? If not, is 
this a bug in Spark?

  was:
I have a complex DataFrame query that fails to run normally but succeeds if I 
add a dummy {{limit()}} upstream in the query tree.

The failure presents itself like this:

{code}
ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes 
to file 
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv400000gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
java.io.FileNotFoundException: 
/private/var/folders/f5/t48vxz555b51mr3g6jjhxv400000gq/T/blockmgr-1e908314-1d49-47ba-8c95-fa43ff43cee4/31/temp_shuffle_6b939273-c6ce-44a0-99b1-db668e6c89dc
 (Too many open files in system)
{code}

My {{ulimit -n}} is already set to 10,000, and I can't set it much higher on 
macOS. However, I don't think that's the issue, since if I add a dummy 
{{limit()}} early on the query tree -- dummy as in it does not actually reduce 
the number of rows queried -- then the same query works.

I've diffed the physical query plans to see what this {{limit()}} is actually 
doing, and the diff is as follows:

{code}
diff plan-with-limit.txt plan-without-limit.txt
24,28c24
<    :                          :     :     +- *GlobalLimit 1000000
<    :                          :     :        +- Exchange SinglePartition
<    :                          :     :           +- *LocalLimit 1000000
<    :                          :     :              +- *Project [...]
<    :                          :     :                 +- *Scan orc [...] 
Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<...
---
>    :                          :     :     +- *Scan orc [...] Format: ORC, 
> InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<...
49,53c45
<                               :     :     +- *GlobalLimit 1000000
<                               :     :        +- Exchange SinglePartition
<                               :     :           +- *LocalLimit 1000000
<                               :     :              +- *Project [...]
<                               :     :                 +- *Scan orc [...] 
Format: ORC, InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct<...
---
>                               :     :     +- *Scan orc [] Format: ORC, 
> InputPaths: file:/..., PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<...
{code}

Does this give any clues as to why this {{limit()}} is helping? Again, the 
1000000 limit you can see in the plan is much higher than the cardinality of 
the dataset I'm reading, so there is no theoretical impact on the output. You 
can see the full query plans attached to this ticket.

Unfortunately, I don't have a minimal reproduction for this issue, but I can 
work towards one with some clues.

I'm seeing this behavior on 2.0.1 and on master at commit 
{{26e1c53aceee37e3687a372ff6c6f05463fd8a94}}.


> DataFrame join spawns unreasonably high number of open files
> ------------------------------------------------------------
>
>                 Key: SPARK-18367
>                 URL: https://issues.apache.org/jira/browse/SPARK-18367
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.1, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>
> I have a moderately complex DataFrame query that spawns north of 10K open 
> files, causing my job to crash. 10K is the macOS limit on how many files a 
> single process can have open at once. It seems unreasonable that Spark should 
> hold that many files open at once.
> I was able to boil down what I'm seeing to the following minimal reproduction:
> {code}
> import pyspark
> from pyspark.sql import Row
> if __name__ == '__main__':
>     spark = pyspark.sql.SparkSession.builder.getOrCreate()
>     df = spark.createDataFrame([
>         Row(a=n)
>         for n in range(500000)
>     ]).coalesce(1)  # a coalesce(1) here "fixes" the problem
>     df = df.join(df, on='a')  #.coalesce(1)  # a coalesce(1) here doesn't help
>     print('parititons:', df.rdd.getNumPartitions())
>     df.explain()
>     df.show(1)
> {code}
> When I run this code, Spark spawns over 2K open files. I can "fix" the 
> problem by adding a {{coalesce(1)}} in the right place, as indicated in the 
> comments above. When I do, Spark spawns no more than 600 open files. The 
> number of partitions without the coalesce is 200.
> Here are the execution plans with and without the coalesce.
> 2K+ open files, without the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#3L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(a#0L, 200)
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#3L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(a#3L, 200)
>          +- *Filter isnotnull(a#3L)
>             +- Scan ExistingRDD[a#3L]
> {code}
> <600 open files, with the coalesce:
> {code}
> == Physical Plan ==
> *Project [a#0L]
> +- *SortMergeJoin [a#0L], [a#4L], Inner
>    :- *Sort [a#0L ASC NULLS FIRST], false, 0
>    :  +- Coalesce 1
>    :     +- *Filter isnotnull(a#0L)
>    :        +- Scan ExistingRDD[a#0L]
>    +- *Sort [a#4L ASC NULLS FIRST], false, 0
>       +- Coalesce 1
>          +- *Filter isnotnull(a#4L)
>             +- Scan ExistingRDD[a#4L]
> {code}
> So the key difference appears to be the {{Exchange hashpartitioning(a#0L, 
> 200)}} operator.
> Is the large number of open files perhaps expected given the join on a large 
> number of distinct keys? If so, how would one mitigate that issue? If not, is 
> this a bug in Spark?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to