GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/16866
[SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() ## What changes were proposed in this pull request? This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's common network layer in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object@1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. ## How was this patch tested? Manually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark SPARK-19529 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16866.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16866 ---- commit c1c4553e32826453ed39eaaefd1cd92ef0e36382 Author: Josh Rosen <joshro...@databricks.com> Date: 2017-02-09T07:25:29Z Use await() instead of awaitUninterruptibly() ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org