Repository: spark Updated Branches: refs/heads/branch-2.0 23050c8a1 -> f50c4372c
[SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <joshro...@databricks.com> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f) Signed-off-by: Cheng Lian <l...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f50c4372 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f50c4372 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f50c4372 Branch: refs/heads/branch-2.0 Commit: f50c4372c3ebd91c0f6c094a7c4d1dd08f3cdb30 Parents: 23050c8 Author: Josh Rosen <joshro...@databricks.com> Authored: Mon Feb 13 11:04:27 2017 -0800 Committer: Cheng Lian <l...@databricks.com> Committed: Mon Feb 13 12:57:29 2017 -0800 ---------------------------------------------------------------------- .../network/client/TransportClientFactory.java | 10 ++++++---- .../spark/network/TransportClientFactorySuite.java | 6 ++++-- .../network/shuffle/ExternalShuffleClient.java | 4 ++-- .../spark/network/shuffle/RetryingBlockFetcher.java | 3 ++- .../shuffle/mesos/MesosExternalShuffleClient.java | 2 +- .../spark/network/sasl/SaslIntegrationSuite.java | 4 ++-- .../shuffle/ExternalShuffleIntegrationSuite.java | 2 +- .../shuffle/ExternalShuffleSecuritySuite.java | 7 ++++--- .../network/shuffle/RetryingBlockFetcherSuite.java | 16 ++++++++-------- 9 files changed, 30 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 6c21446..5d01983 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -122,7 +122,8 @@ public class TransportClientFactory implements Closeable { * * Concurrency: This method is safe to call from multiple threads. */ - public TransportClient createClient(String remoteHost, int remotePort) throws IOException { + public TransportClient createClient(String remoteHost, int remotePort) + throws IOException, InterruptedException { // Get connection from the connection pool first. // If it is not found or not active, create a new one. // Use unresolved address here to avoid DNS resolution each time we creates a client. @@ -190,13 +191,14 @@ public class TransportClientFactory implements Closeable { * As with {@link #createClient(String, int)}, this method is blocking. */ public TransportClient createUnmanagedClient(String remoteHost, int remotePort) - throws IOException { + throws IOException, InterruptedException { final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); return createClient(address); } /** Create a completely new {@link TransportClient} to the remote address. */ - private TransportClient createClient(InetSocketAddress address) throws IOException { + private TransportClient createClient(InetSocketAddress address) + throws IOException, InterruptedException { logger.debug("Creating new connection to {}", address); Bootstrap bootstrap = new Bootstrap(); @@ -223,7 +225,7 @@ public class TransportClientFactory implements Closeable { // Connect to the remote server long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address); - if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) { + if (!cf.await(conf.connectionTimeoutMs())) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); } else if (cf.cause() != null) { http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index 44d16d5..43e63ef 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -100,6 +100,8 @@ public class TransportClientFactorySuite { clients.add(client); } catch (IOException e) { failed.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } }; @@ -143,7 +145,7 @@ public class TransportClientFactorySuite { } @Test - public void returnDifferentClientsForDifferentServers() throws IOException { + public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); @@ -172,7 +174,7 @@ public class TransportClientFactorySuite { } @Test - public void closeBlockClientsWithFactory() throws IOException { + public void closeBlockClientsWithFactory() throws IOException, InterruptedException { TransportClientFactory factory = context.createClientFactory(); TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 58ca87d..fa6a06e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -101,7 +101,7 @@ public class ExternalShuffleClient extends ShuffleClient { new RetryingBlockFetcher.BlockFetchStarter() { @Override public void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportClient client = clientFactory.createClient(host, port); new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start(); } @@ -136,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient { String host, int port, String execId, - ExecutorShuffleInfo executorInfo) throws IOException { + ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { checkInit(); TransportClient client = clientFactory.createUnmanagedClient(host, port); try { http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index d81cf86..dc0686f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -57,7 +57,8 @@ public class RetryingBlockFetcher { * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection * issues. */ - void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException; + void createAndStart(String[] blockIds, BlockFetchingListener listener) + throws IOException, InterruptedException; } /** Shared executor service used for waiting and retrying. */ http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 2add9c8..af32c30 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -69,7 +69,7 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { String host, int port, long heartbeatTimeoutMs, - long heartbeatIntervalMs) throws IOException { + long heartbeatIntervalMs) throws IOException, InterruptedException { checkInit(); ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 6ba937d..81a3f06 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -103,7 +103,7 @@ public class SaslIntegrationSuite { } @Test - public void testGoodClient() throws IOException { + public void testGoodClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.<TransportClientBootstrap>newArrayList( new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); @@ -133,7 +133,7 @@ public class SaslIntegrationSuite { } @Test - public void testNoSaslClient() throws IOException { + public void testNoSaslClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( Lists.<TransportClientBootstrap>newArrayList()); http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 552b536..a04b682 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -240,7 +240,7 @@ public class ExternalShuffleIntegrationSuite { } private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) - throws IOException { + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false); client.init(APP_ID); client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index a0f69ca..6ef1a2d 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -59,7 +59,7 @@ public class ExternalShuffleSecuritySuite { } @Test - public void testValid() throws IOException { + public void testValid() throws IOException, InterruptedException { validate("my-app-id", "secret", false); } @@ -82,12 +82,13 @@ public class ExternalShuffleSecuritySuite { } @Test - public void testEncryption() throws IOException { + public void testEncryption() throws IOException, InterruptedException { validate("my-app-id", "secret", true); } /** Creates an ExternalShuffleClient and attempts to register with the server. */ - private void validate(String appId, String secretKey, boolean encrypt) throws IOException { + private void validate(String appId, String secretKey, boolean encrypt) + throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt); client.init(appId); http://git-wip-us.apache.org/repos/asf/spark/blob/f50c4372/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 91882e3..f221544 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -66,7 +66,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testNoFailures() throws IOException { + public void testNoFailures() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -85,7 +85,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testUnrecoverableFailure() throws IOException { + public void testUnrecoverableFailure() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -104,7 +104,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testSingleIOExceptionOnFirst() throws IOException { + public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -127,7 +127,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testSingleIOExceptionOnSecond() throws IOException { + public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -149,7 +149,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testTwoIOExceptions() throws IOException { + public void testTwoIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -177,7 +177,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testThreeIOExceptions() throws IOException { + public void testThreeIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -209,7 +209,7 @@ public class RetryingBlockFetcherSuite { } @Test - public void testRetryAndUnrecoverable() throws IOException { + public void testRetryAndUnrecoverable() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); List<? extends Map<String, Object>> interactions = Arrays.asList( @@ -252,7 +252,7 @@ public class RetryingBlockFetcherSuite { @SuppressWarnings("unchecked") private static void performInteractions(List<? extends Map<String, Object>> interactions, BlockFetchingListener listener) - throws IOException { + throws IOException, InterruptedException { TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org