[ 
https://issues.apache.org/jira/browse/SPARK-5782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14371927#comment-14371927
 ] 

Mark Khaitman commented on SPARK-5782:
--------------------------------------

Interesting scala equivalent. I ran it and found that it at least didn't manage 
to cause memory exhaustion like the python workers would.

I ended up making 2 tests over production data, where:
- 2 RDDs consisted of approximately 100GB each of shuffle read data from HDFS
- They both took up about 2000 partitions from HDFS and I repartitioned them 
down to 32
- Mapped each record in the RDD to a single key of about 32 bytes, and a list 
of 100/1000 values (test  1 had 100 integers, test 2 had 1000 integers)
- Joined the 2 RDDs and ran a count

When I ran the test with 100 integers for each key, followed by joining the two 
RDDs, the python workers' memory usage were not too crazy

With the case of 1000 integers however, during the join, the python workers 
reached 2GB each on each executor before I had to kill the job.

It seems that either the python workers are not checking their memory usage 
frequently enough, not spilling to disk when they should be, or python's GC is 
screwing up somehow, when the shuffle merging is happening.

> Python Worker / Pyspark Daemon Memory Issue
> -------------------------------------------
>
>                 Key: SPARK-5782
>                 URL: https://issues.apache.org/jira/browse/SPARK-5782
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Shuffle
>    Affects Versions: 1.3.0, 1.2.1, 1.2.2
>         Environment: CentOS 7, Spark Standalone
>            Reporter: Mark Khaitman
>            Priority: Blocker
>
> I'm including the Shuffle component on this, as a brief scan through the code 
> (which I'm not 100% familiar with just yet) shows a large amount of memory 
> handling in it:
> It appears that any type of join between two RDDs spawns up twice as many 
> pyspark.daemon workers compared to the default 1 task -> 1 core configuration 
> in our environment. This can become problematic in the cases where you build 
> up a tree of RDD joins, since the pyspark.daemons do not cease to exist until 
> the top level join is completed (or so it seems)... This can lead to memory 
> exhaustion by a single framework, even though is set to have a 512MB python 
> worker memory limit and few gigs of executor memory.
> Another related issue to this is that the individual python workers are not 
> supposed to even exceed that far beyond 512MB, otherwise they're supposed to 
> spill to disk.
> Some of our python workers are somehow reaching 2GB each (which when 
> multiplied by the number of cores per executor * the number of joins 
> occurring in some cases), causing the Out-of-Memory killer to step up to its 
> unfortunate job! :(
> I think with the _next_limit method in shuffle.py, if the current memory 
> usage is close to the memory limit, then a 1.05 multiplier can endlessly 
> cause more memory to be consumed by the single python worker, since the max 
> of (512 vs 511 * 1.05) would end up blowing up towards the latter of the 
> two... Shouldn't the memory limit be the absolute cap in this case?
> I've only just started looking into the code, and would definitely love to 
> contribute towards Spark, though I figured it might be quicker to resolve if 
> someone already owns the code!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to