Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/18388
  
    Previously I was saying that I have 200k+ connections to one shuffle 
service. I'm sorry about this, the information is wrong. It turns out that our 
each `NodeManager` has two auxiliary shuffle services, one for Spark and one 
for "Hadoop MapReduce". Most of the connections are for "Hadoop MapReduce" 
shuffle service. Analyzing the heap dump, there are only 761 
connection(`NioSocketChannel`)s to the Spark shuffle service. (How I found 
this? Spark shuffle service is using Netty4 for transferring blocks. I found 
tons of `org.jboss.netty.channel.socket.nio.NioAcceptedSocketChannel`, checking 
Netty code, I found they are only used in Netty3, that's used in our Hadoop.) 
So @zsxwing , there is no connection leak in my understanding.
    
    The situation is we have 10K map tasks ran on the same shuffle service and 
around 1K reduce tasks fetching the blocks . On java heap I found one 
`io.netty.channel.ChannelOutboundBuffer.Entry`(reference one block) will cost 
almost 1K bytes and we have 3.5M Entries. When OOM, we have 
`io.netty.channel.ChannelOutboundBuffer.Entry`s retaining 3GBytes.
    
    So the problem here is one connection is fetching too many blocks. I 
believe tuning `spark.reducer.maxReqsInFlight` or 
`spark.reducer.maxBlocksInFlightPerAddress`(#18487) can alleviate this issue.  
The question is how to set it appropriately. It seems hard because we need to 
make a balance between warehouse performance and stability. After all there are 
only 2~3 NodeManagers running OOM, we cannot set 
`spark.reducer.maxReqsInFlight` too small to avoid performance degradation.
    
    I checked the connections of one shuffle services yesterday. 5K connections 
is very common during the night. It's easy to happen, say there are 5K reduces 
running at the same time. What if there are 5 applications and each has 5K 
reduces? That will be 25k connections. If each connection is fetching 100 
blocks and each `Entry` is 1KB. The memory cost is 2.5G. I think it's too much.
    
    So I'm still proposing concurrency control. Different from current change, 
can we control the number of blocks being transferred ? If the number is above 
water mark, we can fail the new coming `OpenBlocks`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to