Josh Rosen created SPARK-19529:
----------------------------------

             Summary: TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()
                 Key: SPARK-19529
                 URL: https://issues.apache.org/jira/browse/SPARK-19529
             Project: Spark
          Issue Type: Bug
          Components: Shuffle, Spark Core
    Affects Versions: 2.1.0, 2.0.0, 1.6.0
            Reporter: Josh Rosen
            Assignee: Josh Rosen


In Spark's Netty RPC layer, TransportClientFactory.createClient() calls 
awaitUninterruptibly() on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
interruptOnCancel = true.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

{code}
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) 
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object@1849476028}) 
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350) 
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
 
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
[...]
{code}

I believe that we can easily fix this by using the 
InterruptedException-throwing await() instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to