[ 
https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935552#comment-16935552
 ] 

Min Shen commented on SPARK-29206:
----------------------------------

[~redsanket], [~tgraves],

Since you worked on committing the original patch, would appreciate your 
comments here.

> Number of shuffle Netty server threads should be a multiple of number of 
> chunk fetch handler threads
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-29206
>                 URL: https://issues.apache.org/jira/browse/SPARK-29206
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 3.0.0
>            Reporter: Min Shen
>            Priority: Major
>
> In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
> to handle the slow-to-process chunk fetch requests in order to improve the 
> responsiveness of shuffle service for RPC requests.
> Initially, we thought by making the number of Netty server threads larger 
> than the number of chunk fetch handler threads, it would reserve some threads 
> for RPC requests thus resolving the various RPC request timeout issues we 
> experienced previously. The solution worked in our cluster initially. 
> However, as the number of Spark applications in our cluster continues to 
> increase, we saw the RPC request (SASL authentication specifically) timeout 
> issue again:
> {noformat}
> java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
> waiting for task.
>       at 
> org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
>       at 
> org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
>       at 
> org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
>       at 
> org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
>  {noformat}
> After further investigation, we realized that as the number of concurrent 
> clients connecting to a shuffle service increases, it becomes _VERY_ 
> important to configure the number of Netty server threads and number of chunk 
> fetch handler threads correctly. Specifically, the number of Netty server 
> threads needs to be a multiple of the number of chunk fetch handler threads. 
> The reason is explained in details below:
> When a channel is established on the Netty server, it is registered with both 
> the Netty server default EventLoopGroup and the chunk fetch handler 
> EventLoopGroup. Once registered, this channel sticks with a given thread in 
> both EventLoopGroups, i.e. all requests from this channel is going to be 
> handled by the same thread. Right now, Spark shuffle Netty server uses the 
> default Netty strategy to select a thread from a EventLoopGroup to be 
> associated with a new channel, which is simply round-robin (Netty's 
> DefaultEventExecutorChooserFactory).
> In SPARK-24355, with the introduced chunk fetch handler thread pool, all 
> chunk fetch requests from a given channel will be first added to the task 
> queue of the chunk fetch handler thread associated with that channel. When 
> the requests get processed, the chunk fetch request handler thread will 
> submit a task to the task queue of the Netty server thread that's also 
> associated with this channel. If the number of Netty server threads is not a 
> multiple of the number of chunk fetch handler threads, it would become a 
> problem when the server has a large number of concurrent connections.
> Assume we configure the number of Netty server threads as 40 and the 
> percentage of chunk fetch handler threads as 87, which leads to 35 chunk 
> fetch handler threads. Then according to the round-robin policy, channel 0, 
> 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty 
> server thread in the default EventLoopGroup. However, since the chunk fetch 
> handler thread pool only has 35 threads, out of these 8 channels, only 
> channel 0 and 280 will be associated with the same chunk fetch handler 
> thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated 
> with different chunk fetch handler threads but associated with the same Netty 
> server thread. This means, the 7 different chunk fetch handler threads 
> associated with these channels could potentially submit tasks to the task 
> queue of the same Netty server thread at the same time. This would lead to 7 
> slow-to-process requests sitting in the task queue. If an RPC request is put 
> in the task queue after these 7 requests, it is very likely to timeout.
> In our cluster, the number of concurrent active connections to a shuffle 
> service could go as high as 6K+ during peak. If the numbers of these thread 
> pools are not configured correctly, our Spark applications are guaranteed to 
> see SASL timeout issues when a shuffle service is dealing with a lot of 
> incoming chunk fetch requests from many distinct clients, which lead to stage 
> failures and lengthy retries.
> To resolve this issue, the number of Netty server threads needs to be a 
> multiple of the number of chunk fetch handler threads. This way, the 
> round-robin policy will guarantee that channels associated with different 
> chunk fetch handler threads will also be associated with different Netty 
> server threads, thus eliminating this potential burst of placing multiple 
> slow-to-process requests in one Netty server thread task queue.
> Since the current patch that's merged in Spark uses 
> `spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the 
> number of chunk fetch handler threads and it rounds up the number, it is very 
> tricky to get the number of these thread pools configured right. In addition, 
> for people who are not aware of this issue, they will very likely to fall 
> into this trap and start seeing the RPC request timeout issue during shuffle 
> fetch when the Spark workloads in their environment get to a certain scale. 
> For these reasons, we propose to change the configurations of the number of 
> threads for both thread pools, such that if people choose to use the 
> dedicated chunk fetch handler, the number of Netty server threads would 
> always be a multiple of the number of chunk fetch handler threads.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to