[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935552#comment-16935552 ]
Min Shen commented on SPARK-29206: ---------------------------------- [~redsanket], [~tgraves], Since you worked on committing the original patch, would appreciate your comments here. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 3.0.0 > Reporter: Min Shen > Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server thread. This means, the 7 different chunk fetch handler threads > associated with these channels could potentially submit tasks to the task > queue of the same Netty server thread at the same time. This would lead to 7 > slow-to-process requests sitting in the task queue. If an RPC request is put > in the task queue after these 7 requests, it is very likely to timeout. > In our cluster, the number of concurrent active connections to a shuffle > service could go as high as 6K+ during peak. If the numbers of these thread > pools are not configured correctly, our Spark applications are guaranteed to > see SASL timeout issues when a shuffle service is dealing with a lot of > incoming chunk fetch requests from many distinct clients, which lead to stage > failures and lengthy retries. > To resolve this issue, the number of Netty server threads needs to be a > multiple of the number of chunk fetch handler threads. This way, the > round-robin policy will guarantee that channels associated with different > chunk fetch handler threads will also be associated with different Netty > server threads, thus eliminating this potential burst of placing multiple > slow-to-process requests in one Netty server thread task queue. > Since the current patch that's merged in Spark uses > `spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the > number of chunk fetch handler threads and it rounds up the number, it is very > tricky to get the number of these thread pools configured right. In addition, > for people who are not aware of this issue, they will very likely to fall > into this trap and start seeing the RPC request timeout issue during shuffle > fetch when the Spark workloads in their environment get to a certain scale. > For these reasons, we propose to change the configurations of the number of > threads for both thread pools, such that if people choose to use the > dedicated chunk fetch handler, the number of Netty server threads would > always be a multiple of the number of chunk fetch handler threads. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org