Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2968d8c06 -> 5db234730


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

Manually.

Author: Josh Rosen <joshro...@databricks.com>

Closes #16866 from JoshRosen/SPARK-19529.

(cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5db23473
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5db23473
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5db23473

Branch: refs/heads/branch-2.1
Commit: 5db23473008a58fb9a7f77ad8b01bcdc2c5f2d9c
Parents: 2968d8c
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Mon Feb 13 12:49:37 2017 -0800

----------------------------------------------------------------------
 .../network/client/TransportClientFactory.java      | 10 ++++++----
 .../spark/network/TransportClientFactorySuite.java  |  6 ++++--
 .../network/shuffle/ExternalShuffleClient.java      |  4 ++--
 .../spark/network/shuffle/RetryingBlockFetcher.java |  3 ++-
 .../shuffle/mesos/MesosExternalShuffleClient.java   |  2 +-
 .../spark/network/sasl/SaslIntegrationSuite.java    |  4 ++--
 .../shuffle/ExternalShuffleIntegrationSuite.java    |  2 +-
 .../shuffle/ExternalShuffleSecuritySuite.java       |  7 ++++---
 .../network/shuffle/RetryingBlockFetcherSuite.java  | 16 ++++++++--------
 9 files changed, 30 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index cb10edf..b50e043 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -122,7 +122,8 @@ public class TransportClientFactory implements Closeable {
    *
    * Concurrency: This method is safe to call from multiple threads.
    */
-  public TransportClient createClient(String remoteHost, int remotePort) 
throws IOException {
+  public TransportClient createClient(String remoteHost, int remotePort)
+      throws IOException, InterruptedException {
     // Get connection from the connection pool first.
     // If it is not found or not active, create a new one.
     // Use unresolved address here to avoid DNS resolution each time we 
creates a client.
@@ -190,13 +191,14 @@ public class TransportClientFactory implements Closeable {
    * As with {@link #createClient(String, int)}, this method is blocking.
    */
   public TransportClient createUnmanagedClient(String remoteHost, int 
remotePort)
-      throws IOException {
+      throws IOException, InterruptedException {
     final InetSocketAddress address = new InetSocketAddress(remoteHost, 
remotePort);
     return createClient(address);
   }
 
   /** Create a completely new {@link TransportClient} to the remote address. */
-  private TransportClient createClient(InetSocketAddress address) throws 
IOException {
+  private TransportClient createClient(InetSocketAddress address)
+      throws IOException, InterruptedException {
     logger.debug("Creating new connection to {}", address);
 
     Bootstrap bootstrap = new Bootstrap();
@@ -223,7 +225,7 @@ public class TransportClientFactory implements Closeable {
     // Connect to the remote server
     long preConnect = System.nanoTime();
     ChannelFuture cf = bootstrap.connect(address);
-    if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
+    if (!cf.await(conf.connectionTimeoutMs())) {
       throw new IOException(
         String.format("Connecting to %s timed out (%s ms)", address, 
conf.connectionTimeoutMs()));
     } else if (cf.cause() != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index 44d16d5..43e63ef 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -100,6 +100,8 @@ public class TransportClientFactorySuite {
             clients.add(client);
           } catch (IOException e) {
             failed.incrementAndGet();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
           }
         }
       };
@@ -143,7 +145,7 @@ public class TransportClientFactorySuite {
   }
 
   @Test
-  public void returnDifferentClientsForDifferentServers() throws IOException {
+  public void returnDifferentClientsForDifferentServers() throws IOException, 
InterruptedException {
     TransportClientFactory factory = context.createClientFactory();
     TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
     TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), 
server2.getPort());
@@ -172,7 +174,7 @@ public class TransportClientFactorySuite {
   }
 
   @Test
-  public void closeBlockClientsWithFactory() throws IOException {
+  public void closeBlockClientsWithFactory() throws IOException, 
InterruptedException {
     TransportClientFactory factory = context.createClientFactory();
     TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
     TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), 
server2.getPort());

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 772fb88..eea5cf7 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -101,7 +101,7 @@ public class ExternalShuffleClient extends ShuffleClient {
         new RetryingBlockFetcher.BlockFetchStarter() {
           @Override
           public void createAndStart(String[] blockIds, BlockFetchingListener 
listener)
-              throws IOException {
+              throws IOException, InterruptedException {
             TransportClient client = clientFactory.createClient(host, port);
             new OneForOneBlockFetcher(client, appId, execId, blockIds, 
listener).start();
           }
@@ -136,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       String host,
       int port,
       String execId,
-      ExecutorShuffleInfo executorInfo) throws IOException {
+      ExecutorShuffleInfo executorInfo) throws IOException, 
InterruptedException {
     checkInit();
     TransportClient client = clientFactory.createUnmanagedClient(host, port);
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 72bd0f8..5be8550 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -57,7 +57,8 @@ public class RetryingBlockFetcher {
      * {@link org.apache.spark.network.client.TransportClientFactory} in order 
to fix connection
      * issues.
      */
-    void createAndStart(String[] blockIds, BlockFetchingListener listener) 
throws IOException;
+    void createAndStart(String[] blockIds, BlockFetchingListener listener)
+         throws IOException, InterruptedException;
   }
 
   /** Shared executor service used for waiting and retrying. */

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index 42cedd9..c6d6029 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -69,7 +69,7 @@ public class MesosExternalShuffleClient extends 
ExternalShuffleClient {
       String host,
       int port,
       long heartbeatTimeoutMs,
-      long heartbeatIntervalMs) throws IOException {
+      long heartbeatIntervalMs) throws IOException, InterruptedException {
 
     checkInit();
     ByteBuffer registerDriver = new RegisterDriver(appId, 
heartbeatTimeoutMs).toByteBuffer();

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 6ba937d..81a3f06 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -103,7 +103,7 @@ public class SaslIntegrationSuite {
   }
 
   @Test
-  public void testGoodClient() throws IOException {
+  public void testGoodClient() throws IOException, InterruptedException {
     clientFactory = context.createClientFactory(
       Lists.<TransportClientBootstrap>newArrayList(
         new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
@@ -133,7 +133,7 @@ public class SaslIntegrationSuite {
   }
 
   @Test
-  public void testNoSaslClient() throws IOException {
+  public void testNoSaslClient() throws IOException, InterruptedException {
     clientFactory = context.createClientFactory(
       Lists.<TransportClientBootstrap>newArrayList());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 552b536..a04b682 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -240,7 +240,7 @@ public class ExternalShuffleIntegrationSuite {
   }
 
   private void registerExecutor(String executorId, ExecutorShuffleInfo 
executorInfo)
-      throws IOException {
+      throws IOException, InterruptedException {
     ExternalShuffleClient client = new ExternalShuffleClient(conf, null, 
false, false);
     client.init(APP_ID);
     client.registerWithShuffleServer(TestUtils.getLocalHost(), 
server.getPort(),

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index a0f69ca..6ef1a2d 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -59,7 +59,7 @@ public class ExternalShuffleSecuritySuite {
   }
 
   @Test
-  public void testValid() throws IOException {
+  public void testValid() throws IOException, InterruptedException {
     validate("my-app-id", "secret", false);
   }
 
@@ -82,12 +82,13 @@ public class ExternalShuffleSecuritySuite {
   }
 
   @Test
-  public void testEncryption() throws IOException {
+  public void testEncryption() throws IOException, InterruptedException {
     validate("my-app-id", "secret", true);
   }
 
   /** Creates an ExternalShuffleClient and attempts to register with the 
server. */
-  private void validate(String appId, String secretKey, boolean encrypt) 
throws IOException {
+  private void validate(String appId, String secretKey, boolean encrypt)
+        throws IOException, InterruptedException {
     ExternalShuffleClient client =
       new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, 
secretKey), true, encrypt);
     client.init(appId);

http://git-wip-us.apache.org/repos/asf/spark/blob/5db23473/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 91882e3..f221544 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -66,7 +66,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testNoFailures() throws IOException {
+  public void testNoFailures() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -85,7 +85,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testUnrecoverableFailure() throws IOException {
+  public void testUnrecoverableFailure() throws IOException, 
InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -104,7 +104,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testSingleIOExceptionOnFirst() throws IOException {
+  public void testSingleIOExceptionOnFirst() throws IOException, 
InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -127,7 +127,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testSingleIOExceptionOnSecond() throws IOException {
+  public void testSingleIOExceptionOnSecond() throws IOException, 
InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -149,7 +149,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testTwoIOExceptions() throws IOException {
+  public void testTwoIOExceptions() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -177,7 +177,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testThreeIOExceptions() throws IOException {
+  public void testThreeIOExceptions() throws IOException, InterruptedException 
{
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -209,7 +209,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testRetryAndUnrecoverable() throws IOException {
+  public void testRetryAndUnrecoverable() throws IOException, 
InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -252,7 +252,7 @@ public class RetryingBlockFetcherSuite {
   @SuppressWarnings("unchecked")
   private static void performInteractions(List<? extends Map<String, Object>> 
interactions,
                                           BlockFetchingListener listener)
-    throws IOException {
+    throws IOException, InterruptedException {
 
     TransportConf conf = new TransportConf("shuffle", new 
SystemPropertyConfigProvider());
     BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to