spark git commit: [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

2017-02-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 23050c8a1 -> f50c4372c


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

Manually.

Author: Josh Rosen 

Closes #16866 from JoshRosen/SPARK-19529.

(cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f50c4372
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f50c4372
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f50c4372

Branch: refs/heads/branch-2.0
Commit: f50c4372c3ebd91c0f6c094a7c4d1dd08f3cdb30
Parents: 23050c8
Author: Josh Rosen 
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian 
Committed: Mon Feb 13 12:57:29 2017 -0800

--
 .../network/client/TransportClientFactory.java  | 10 ++
 .../spark/network/TransportClientFactorySuite.java  |  6 --
 .../network/shuffle/ExternalShuffleClient.java  |  4 ++--
 

spark git commit: [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

2017-02-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2968d8c06 -> 5db234730


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

Manually.

Author: Josh Rosen 

Closes #16866 from JoshRosen/SPARK-19529.

(cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5db23473
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5db23473
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5db23473

Branch: refs/heads/branch-2.1
Commit: 5db23473008a58fb9a7f77ad8b01bcdc2c5f2d9c
Parents: 2968d8c
Author: Josh Rosen 
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian 
Committed: Mon Feb 13 12:49:37 2017 -0800

--
 .../network/client/TransportClientFactory.java  | 10 ++
 .../spark/network/TransportClientFactorySuite.java  |  6 --
 .../network/shuffle/ExternalShuffleClient.java  |  4 ++--
 

spark git commit: [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

2017-02-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master ab88b2410 -> 1c4d10b10


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

## What changes were proposed in this pull request?

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

## How was this patch tested?

Manually.

Author: Josh Rosen 

Closes #16866 from JoshRosen/SPARK-19529.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c4d10b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c4d10b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c4d10b1

Branch: refs/heads/master
Commit: 1c4d10b10c78d138b55e381ec6828e04fef70d6f
Parents: ab88b24
Author: Josh Rosen 
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian 
Committed: Mon Feb 13 11:04:27 2017 -0800

--
 .../network/client/TransportClientFactory.java  | 10 ++
 .../spark/network/TransportClientFactorySuite.java  |  6 --
 .../network/shuffle/ExternalShuffleClient.java  |  4 ++--