[ https://issues.apache.org/jira/browse/SPARK-29206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935556#comment-16935556 ]
Min Shen commented on SPARK-29206: ---------------------------------- We initially tried an alternative approach to resolve this issue by implementing a custom Netty EventExecutorChooserFactory, so Spark shuffle Netty server can be a bit more intelligent at choosing a thread among an EventLoopGroup to be associated with a new channel. In latest version of Netty 4.1, each (Nio|Epoll)EventLoop exposes information about its number of pending tasks and registered channels. We initially thought we could use these metrics to do better at load balancing so to avoid registering a channel with a busy EventLoop. However, as we implemented this approach, we realized that the state of an EventLoop at channel registration time could be very different from when an RPC request from this channel is placed in the task queue of this EventLoop later. Since there is no way to precisely tell the state of an EventLoop in the future, we gave up on this approach. > Number of shuffle Netty server threads should be a multiple of number of > chunk fetch handler threads > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-29206 > URL: https://issues.apache.org/jira/browse/SPARK-29206 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 3.0.0 > Reporter: Min Shen > Priority: Major > > In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool > to handle the slow-to-process chunk fetch requests in order to improve the > responsiveness of shuffle service for RPC requests. > Initially, we thought by making the number of Netty server threads larger > than the number of chunk fetch handler threads, it would reserve some threads > for RPC requests thus resolving the various RPC request timeout issues we > experienced previously. The solution worked in our cluster initially. > However, as the number of Spark applications in our cluster continues to > increase, we saw the RPC request (SASL authentication specifically) timeout > issue again: > {noformat} > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) > at > org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) > at > org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) > at > org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218) > {noformat} > After further investigation, we realized that as the number of concurrent > clients connecting to a shuffle service increases, it becomes _VERY_ > important to configure the number of Netty server threads and number of chunk > fetch handler threads correctly. Specifically, the number of Netty server > threads needs to be a multiple of the number of chunk fetch handler threads. > The reason is explained in details below: > When a channel is established on the Netty server, it is registered with both > the Netty server default EventLoopGroup and the chunk fetch handler > EventLoopGroup. Once registered, this channel sticks with a given thread in > both EventLoopGroups, i.e. all requests from this channel is going to be > handled by the same thread. Right now, Spark shuffle Netty server uses the > default Netty strategy to select a thread from a EventLoopGroup to be > associated with a new channel, which is simply round-robin (Netty's > DefaultEventExecutorChooserFactory). > In SPARK-24355, with the introduced chunk fetch handler thread pool, all > chunk fetch requests from a given channel will be first added to the task > queue of the chunk fetch handler thread associated with that channel. When > the requests get processed, the chunk fetch request handler thread will > submit a task to the task queue of the Netty server thread that's also > associated with this channel. If the number of Netty server threads is not a > multiple of the number of chunk fetch handler threads, it would become a > problem when the server has a large number of concurrent connections. > Assume we configure the number of Netty server threads as 40 and the > percentage of chunk fetch handler threads as 87, which leads to 35 chunk > fetch handler threads. Then according to the round-robin policy, channel 0, > 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty > server thread in the default EventLoopGroup. However, since the chunk fetch > handler thread pool only has 35 threads, out of these 8 channels, only > channel 0 and 280 will be associated with the same chunk fetch handler > thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated > with different chunk fetch handler threads but associated with the same Netty > server thread. This means, the 7 different chunk fetch handler threads > associated with these channels could potentially submit tasks to the task > queue of the same Netty server thread at the same time. This would lead to 7 > slow-to-process requests sitting in the task queue. If an RPC request is put > in the task queue after these 7 requests, it is very likely to timeout. > In our cluster, the number of concurrent active connections to a shuffle > service could go as high as 6K+ during peak. If the numbers of these thread > pools are not configured correctly, our Spark applications are guaranteed to > see SASL timeout issues when a shuffle service is dealing with a lot of > incoming chunk fetch requests from many distinct clients, which lead to stage > failures and lengthy retries. > To resolve this issue, the number of Netty server threads needs to be a > multiple of the number of chunk fetch handler threads. This way, the > round-robin policy will guarantee that channels associated with different > chunk fetch handler threads will also be associated with different Netty > server threads, thus eliminating this potential burst of placing multiple > slow-to-process requests in one Netty server thread task queue. > Since the current patch that's merged in Spark uses > `spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the > number of chunk fetch handler threads and it rounds up the number, it is very > tricky to get the number of these thread pools configured right. In addition, > for people who are not aware of this issue, they will very likely to fall > into this trap and start seeing the RPC request timeout issue during shuffle > fetch when the Spark workloads in their environment get to a certain scale. > For these reasons, we propose to change the configurations of the number of > threads for both thread pools, such that if people choose to use the > dedicated chunk fetch handler, the number of Netty server threads would > always be a multiple of the number of chunk fetch handler threads. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org