GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/16866

    [SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

    ## What changes were proposed in this pull request?
    
    This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's common network layer in order to fix a bug which may 
cause tasks to be uncancellable.
    
    In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.
    
    As an example of the impact of this problem, I experienced significant 
numbers of uncancellable "zombie tasks" on a production cluster where several 
tasks were blocked trying to connect to a dead shuffle server and then 
continued running as zombies after I cancelled the associated Spark stage. The 
zombie tasks ran for several minutes with the following stack:
    
    ```
    java.lang.Object.wait(Native Method)
    java.lang.Object.wait(Object.java:460)
    io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) 
    
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
 
    
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
 
    
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object@1849476028}) 
    
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
 
    
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 
    
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 
    
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
 
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
 
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
    350) 
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
 
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
 
    
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
 
    
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) 
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    [...]
    ```
    
    As far as I can tell, `awaitUninterruptibly()` might have been used in 
order to avoid having to declare that methods throw `InterruptedException` 
(this code is written in Java, hence the need to use checked exceptions). This 
patch simply replaces this with a regular, interruptible `await()` call,.
    
    This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).
    
    An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.
    
    ## How was this patch tested?
    
    Manually.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark SPARK-19529

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16866.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16866
    
----
commit c1c4553e32826453ed39eaaefd1cd92ef0e36382
Author: Josh Rosen <joshro...@databricks.com>
Date:   2017-02-09T07:25:29Z

    Use await() instead of awaitUninterruptibly()

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to