Min Shen created SPARK-24355:
--------------------------------

             Summary: Improve Spark shuffle server responsiveness to 
non-ChunkFetch requests
                 Key: SPARK-24355
                 URL: https://issues.apache.org/jira/browse/SPARK-24355
             Project: Spark
          Issue Type: Improvement
          Components: Shuffle
    Affects Versions: 2.3.0
         Environment: Hadoop-2.7.4

Spark-2.3.0
            Reporter: Min Shen


We run Spark on YARN, and deploy Spark external shuffle service as part of YARN 
NM aux service.

One issue we saw with Spark external shuffle service is the various timeout 
experienced by the clients on either registering executor with local shuffle 
server or establish connection to remote shuffle server.

Example of a timeout for establishing connection with remote shuffle server:
{code:java}
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
        at 
org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
        at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
        at 
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
        at 
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at 
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
....{code}
Example of a timeout for registering executor with local shuffle server:
{code:java}
ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout 
waiting for task.
        at 
org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
        at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
        at 
org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at 
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
        at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
        at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
{code}
While patches such as SPARK-20640 and config parameters such as 
spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when 
spark.authenticate is set to true) could help to alleviate this type of 
problems, it does not solve the fundamental issue.

We have observed that, when the shuffle workload gets very busy in peak hours, 
the client requests could timeout even after configuring these parameters to 
very high values. Further investigating this issue revealed the following issue:

Right now, the default server side netty handler threads is 2 * # cores, and 
can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server 
netty handler thread.
However, when the server netty handler threads start to process 
ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
contentions from the random read operations initiated by all the 
ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent 
ChunkFetchRequests, the server side netty handler threads could all be blocked 
on reading shuffle files, thus leaving no handler thread available to process 
other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler 
threads that could get blocked when processing ChunkFetchRequest. We have a 
patch to do this by using a separate EventLoopGroup with a dedicated 
ChannelHandler to process ChunkFetchRequest. This enables shuffle server to 
reserve netty handler threads for non-ChunkFetchRequest, thus enabling 
consistent processing time for these requests which are fast to process. After 
deploying the patch in our infrastructure, we no longer see timeout issues with 
either executor registration with local shuffle server or shuffle client 
establishing connection with remote shuffle server.

Will post the patch soon, and want to gather feedbacks from the community.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to