This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ec28925 [SPARK-31179] Fast fail the connection while last connection
failed in fast fail time window
ec28925 is described below
commit ec289252368127f8261eb6a2270362ba0b65db36
Author: turbofei <[email protected]>
AuthorDate: Thu Apr 2 08:18:14 2020 -0500
[SPARK-31179] Fast fail the connection while last connection failed in fast
fail time window
## What changes were proposed in this pull request?
For TransportFactory, the requests sent to the same address share a
clientPool.
Specially, when the io.numConnectionPerPeer is 1, these requests would
share a same client.
When this address is unreachable, the createClient operation would be still
timeout.
And these requests would block each other during createClient, because
there is a lock for this shared client.
It would cost connectionNum \* connectionTimeOut \* maxRetry to retry, and
then fail the task.
It fact, it is expected that this task could fail in connectionTimeOut *
maxRetry.
In this PR, I set a fastFail time window for the clientPool, if the last
connection failed in this time window, the new connection would fast fail.
## Why are the changes needed?
It can save time for some cases.
## Does this PR introduce any user-facing change?
No.
## How was this patch tested?
Existing UT.
Closes #27943 from turboFei/SPARK-31179-fast-fail-connection.
Authored-by: turbofei <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
---
.../network/client/TransportClientFactory.java | 42 +++++++++++++++++++---
.../{ => client}/TransportClientFactorySuite.java | 28 +++++++++++++--
.../network/shuffle/ExternalBlockStoreClient.java | 4 +--
.../network/netty/NettyBlockTransferService.scala | 4 +--
.../netty/NettyBlockTransferServiceSuite.scala | 2 +-
5 files changed, 68 insertions(+), 12 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index c9ef9f9..24c436a 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import com.codahale.metrics.MetricSet;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -61,6 +62,7 @@ public class TransportClientFactory implements Closeable {
private static class ClientPool {
TransportClient[] clients;
Object[] locks;
+ volatile long lastConnectionFailed;
ClientPool(int size) {
clients = new TransportClient[size];
@@ -68,6 +70,7 @@ public class TransportClientFactory implements Closeable {
for (int i = 0; i < size; i++) {
locks[i] = new Object();
}
+ lastConnectionFailed = 0;
}
}
@@ -86,6 +89,7 @@ public class TransportClientFactory implements Closeable {
private EventLoopGroup workerGroup;
private final PooledByteBufAllocator pooledAllocator;
private final NettyMemoryMetrics metrics;
+ private final int fastFailTimeWindow;
public TransportClientFactory(
TransportContext context,
@@ -112,6 +116,7 @@ public class TransportClientFactory implements Closeable {
}
this.metrics = new NettyMemoryMetrics(
this.pooledAllocator, conf.getModuleName() + "-client", conf);
+ fastFailTimeWindow = (int)(conf.ioRetryWaitTimeMs() * 0.95);
}
public MetricSet getAllMetrics() {
@@ -121,18 +126,27 @@ public class TransportClientFactory implements Closeable {
/**
* Create a {@link TransportClient} connecting to the given remote host /
port.
*
- * We maintains an array of clients (size determined by
spark.shuffle.io.numConnectionsPerPeer)
+ * We maintain an array of clients (size determined by
spark.shuffle.io.numConnectionsPerPeer)
* and randomly picks one to use. If no client was previously created in the
randomly selected
* spot, this function creates a new client and places it there.
*
+ * If the fastFail parameter is true, fail immediately when the last attempt
to the same address
+ * failed within the fast fail time window (95 percent of the io wait retry
timeout). The
+ * assumption is the caller will handle retrying.
+ *
* Prior to the creation of a new TransportClient, we will execute all
* {@link TransportClientBootstrap}s that are registered with this factory.
*
* This blocks until a connection is successfully established and fully
bootstrapped.
*
* Concurrency: This method is safe to call from multiple threads.
+ *
+ * @param remoteHost remote address host
+ * @param remotePort remote address port
+ * @param fastFail whether this call should fail immediately when the last
attempt to the same
+ * address failed with in the last fast fail time window.
*/
- public TransportClient createClient(String remoteHost, int remotePort)
+ public TransportClient createClient(String remoteHost, int remotePort,
boolean fastFail)
throws IOException, InterruptedException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
@@ -192,11 +206,30 @@ public class TransportClientFactory implements Closeable {
logger.info("Found inactive connection to {}, creating a new one.",
resolvedAddress);
}
}
- clientPool.clients[clientIndex] = createClient(resolvedAddress);
+ // If this connection should fast fail when last connection failed in
last fast fail time
+ // window and it did, fail this connection directly.
+ if (fastFail && System.currentTimeMillis() -
clientPool.lastConnectionFailed <
+ fastFailTimeWindow) {
+ throw new IOException(
+ String.format("Connecting to %s failed in the last %s ms, fail this
connection directly",
+ resolvedAddress, fastFailTimeWindow));
+ }
+ try {
+ clientPool.clients[clientIndex] = createClient(resolvedAddress);
+ clientPool.lastConnectionFailed = 0;
+ } catch (IOException e) {
+ clientPool.lastConnectionFailed = System.currentTimeMillis();
+ throw e;
+ }
return clientPool.clients[clientIndex];
}
}
+ public TransportClient createClient(String remoteHost, int remotePort)
+ throws IOException, InterruptedException {
+ return createClient(remoteHost, remotePort, false);
+ }
+
/**
* Create a completely new {@link TransportClient} to the given remote host
/ port.
* This connection is not pooled.
@@ -210,7 +243,8 @@ public class TransportClientFactory implements Closeable {
}
/** Create a completely new {@link TransportClient} to the remote address. */
- private TransportClient createClient(InetSocketAddress address)
+ @VisibleForTesting
+ TransportClient createClient(InetSocketAddress address)
throws IOException, InterruptedException {
logger.debug("Creating new connection to {}", address);
diff --git
a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
similarity index 88%
rename from
common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
rename to
common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
index 9b76981..ea0ac51 100644
---
a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++
b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.network;
+package org.apache.spark.network.client;
import java.io.IOException;
import java.util.Collections;
@@ -29,14 +29,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
@@ -224,4 +226,24 @@ public class TransportClientFactorySuite {
factory.close();
factory.createClient(TestUtils.getLocalHost(), server1.getPort());
}
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void fastFailConnectionInTimeWindow() throws IOException,
InterruptedException {
+ TransportClientFactory factory = context.createClientFactory();
+ TransportServer server = context.createServer();
+ int unreachablePort = server.getPort();
+ server.close();
+ try {
+ factory.createClient(TestUtils.getLocalHost(), unreachablePort, true);
+ } catch (Exception e) {
+ assert(e instanceof IOException);
+ }
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("fail this connection directly");
+ factory.createClient(TestUtils.getLocalHost(), unreachablePort, true);
+ expectedException = ExpectedException.none();
+ }
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index d6185f0..51dc333 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -101,11 +101,12 @@ public class ExternalBlockStoreClient extends
BlockStoreClient {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host,
port, execId);
try {
+ int maxRetries = conf.maxIORetries();
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(blockIds1, listener1) -> {
// Unless this client is closed.
if (clientFactory != null) {
- TransportClient client = clientFactory.createClient(host, port);
+ TransportClient client = clientFactory.createClient(host, port,
maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId,
blockIds1, listener1, conf, downloadFileManager).start();
} else {
@@ -113,7 +114,6 @@ public class ExternalBlockStoreClient extends
BlockStoreClient {
}
};
- int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid
it just in case there's
// a bug in this code. We should remove the if statement once we're
sure of the stability.
diff --git
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ffb6960..3de7377 100644
---
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -115,11 +115,12 @@ private[spark] class NettyBlockTransferService(
tempFileManager: DownloadFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
+ val maxRetries = transportConf.maxIORetries()
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String],
listener: BlockFetchingListener): Unit = {
try {
- val client = clientFactory.createClient(host, port)
+ val client = clientFactory.createClient(host, port, maxRetries > 0)
new OneForOneBlockFetcher(client, appId, execId, blockIds,
listener,
transportConf, tempFileManager).start()
} catch {
@@ -136,7 +137,6 @@ private[spark] class NettyBlockTransferService(
}
}
- val maxRetries = transportConf.maxIORetries()
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid
it just in case there's
// a bug in this code. We should remove the if statement once we're
sure of the stability.
diff --git
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index edddf88..c804102 100644
---
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -105,7 +105,7 @@ class NettyBlockTransferServiceSuite
// This is used to touch an IOException during fetching block.
when(client.sendRpc(any(), any())).thenAnswer(_ => {throw new
IOException()})
var createClientCount = 0
- when(clientFactory.createClient(any(), any())).thenAnswer(_ => {
+ when(clientFactory.createClient(any(), any(), any())).thenAnswer(_ => {
createClientCount += 1
client
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]