Min Shen created SPARK-29206:
--------------------------------

             Summary: Number of shuffle Netty server threads should be a 
multiple of number of chunk fetch handler threads
                 Key: SPARK-29206
                 URL: https://issues.apache.org/jira/browse/SPARK-29206
             Project: Spark
          Issue Type: Improvement
          Components: Shuffle
    Affects Versions: 3.0.0
            Reporter: Min Shen


In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool 
to handle the slow-to-process chunk fetch requests in order to improve the 
responsiveness of shuffle service for RPC requests.

Initially, we thought by making the number of Netty server threads larger than 
the number of chunk fetch handler threads, it would reserve some threads for 
RPC requests thus resolving the various RPC request timeout issues we 
experienced previously. The solution worked in our cluster initially. However, 
as the number of Spark applications in our cluster continues to increase, we 
saw the RPC request (SASL authentication specifically) timeout issue again:
{noformat}
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
        at 
org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
        at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
        at 
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at 
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
        at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
        at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
 {noformat}
After further investigation, we realized that as the number of concurrent 
clients connecting to a shuffle service increases, it becomes _VERY_ important 
to configure the number of Netty server threads and number of chunk fetch 
handler threads correctly. Specifically, the number of Netty server threads 
needs to be a multiple of the number of chunk fetch handler threads. The reason 
is explained in details below:

When a channel is established on the Netty server, it is registered with both 
the Netty server default EventLoopGroup and the chunk fetch handler 
EventLoopGroup. Once registered, this channel sticks with a given thread in 
both EventLoopGroups, i.e. all requests from this channel is going to be 
handled by the same thread. Right now, Spark shuffle Netty server uses the 
default Netty strategy to select a thread from a EventLoopGroup to be 
associated with a new channel, which is simply round-robin (Netty's 
DefaultEventExecutorChooserFactory).

In SPARK-24355, with the introduced chunk fetch handler thread pool, all chunk 
fetch requests from a given channel will be first added to the task queue of 
the chunk fetch handler thread associated with that channel. When the requests 
get processed, the chunk fetch request handler thread will submit a task to the 
task queue of the Netty server thread that's also associated with this channel. 
If the number of Netty server threads is not a multiple of the number of chunk 
fetch handler threads, it would become a problem when the server has a large 
number of concurrent connections.

Assume we configure the number of Netty server threads as 40 and the percentage 
of chunk fetch handler threads as 87, which leads to 35 chunk fetch handler 
threads. Then according to the round-robin policy, channel 0, 40, 80, 120, 160, 
200, 240, and 280 will all be associated with the 1st Netty server thread in 
the default EventLoopGroup. However, since the chunk fetch handler thread pool 
only has 35 threads, out of these 8 channels, only channel 0 and 280 will be 
associated with the same chunk fetch handler thread. Thus, channel 0, 40, 80, 
120, 160, 200, 240 will all be associated with different chunk fetch handler 
threads but associated with the same Netty server thread. This means, the 7 
different chunk fetch handler threads associated with these channels could 
potentially submit tasks to the task queue of the same Netty server thread at 
the same time. This would lead to 7 slow-to-process requests sitting in the 
task queue. If an RPC request is put in the task queue after these 7 requests, 
it is very likely to timeout.

In our cluster, the number of concurrent active connections to a shuffle 
service could go as high as 6K+ during peak. If the numbers of these thread 
pools are not configured correctly, our Spark applications are guaranteed to 
see SASL timeout issues when a shuffle service is dealing with a lot of 
incoming chunk fetch requests from many distinct clients, which lead to stage 
failures and lengthy retries.

To resolve this issue, the number of Netty server threads needs to be a 
multiple of the number of chunk fetch handler threads. This way, the 
round-robin policy will guarantee that channels associated with different chunk 
fetch handler threads will also be associated with different Netty server 
threads, thus eliminating this potential burst of placing multiple 
slow-to-process requests in one Netty server thread task queue.

Since the current patch that's merged in Spark uses 
`spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the number 
of chunk fetch handler threads and it rounds up the number, it is very tricky 
to get the number of these thread pools configured right. In addition, for 
people who are not aware of this issue, they will very likely to fall into this 
trap and start seeing the RPC request timeout issue during shuffle fetch when 
the Spark workloads in their environment get to a certain scale. For these 
reasons, we propose to change the configurations of the number of threads for 
both thread pools, such that if people choose to use the dedicated chunk fetch 
handler, the number of Netty server threads would always be a multiple of the 
number of chunk fetch handler threads.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to