[ 
https://issues.apache.org/jira/browse/SPARK-27991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327118#comment-17327118
 ] 

Apache Spark commented on SPARK-27991:
--------------------------------------

User 'Ngone51' has created a pull request for this issue:
https://github.com/apache/spark/pull/32287

> ShuffleBlockFetcherIterator should take Netty constant-factor overheads into 
> account when limiting number of simultaneous block fetches
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27991
>                 URL: https://issues.apache.org/jira/browse/SPARK-27991
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Josh Rosen
>            Priority: Major
>
> ShuffleBlockFetcherIterator has logic to limit the number of simultaneous 
> block fetches. By default, this logic tries to keep the number of outstanding 
> block fetches [beneath a data size 
> limit|https://github.com/apache/spark/blob/v2.4.3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L274]
>  ({{maxBytesInFlight}}). However, this limiting does not take fixed overheads 
> into account: even though a remote block might be, say, 4KB, there are 
> certain fixed-size internal overheads due to Netty buffer sizes which may 
> cause the actual space requirements to be larger.
> As a result, if a map stage produces a huge number of extremely tiny blocks 
> then we may see errors like
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 
> byte(s) of direct memory (used: 39325794304, max: 39325794304)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
> [...]
> Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
> 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304)
> at 
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
> at 
> io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
> at 
> io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
> [...]{code}
> SPARK-24989 is another report of this problem (but with a different proposed 
> fix).
> This problem can currently be mitigated by setting 
> {{spark.reducer.maxReqsInFlight}} to some some non-IntMax value (SPARK-6166), 
> but this additional manual configuration step is cumbersome.
> Instead, I think that Spark should take these fixed overheads into account in 
> the {{maxBytesInFlight}} calculation: instead of using blocks' actual sizes, 
> use {{Math.min(blockSize, minimumNettyBufferSize)}}. There might be some 
> tricky details involved to make this work on all configurations (e.g. to use 
> a different minimum when direct buffers are disabled, etc.), but I think the 
> core idea behind the fix is pretty simple.
> This will improve Spark's stability and removes configuration / tuning burden 
> from end users.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to