Repository: spark
Updated Branches:
  refs/heads/master 689386b1c -> 6e5fc3788


[SPARK-11252][NETWORK] ShuffleClient should release connection after fetching 
blocks had been completed for external shuffle

with yarn's external shuffle, ExternalShuffleClient of executors reserve its 
connections for yarn's NodeManager until application has been completed. so it 
will make NodeManager and executors have many socket connections.
in order to reduce network pressure of NodeManager's shuffleService, after 
registerWithShuffleServer or fetchBlocks have been completed in 
ExternalShuffleClient, connection for NM's shuffleService needs to be 
closed.andrewor14 rxin vanzin

Author: Lianhui Wang <lianhuiwan...@gmail.com>

Closes #9227 from lianhuiwang/spark-11252.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e5fc378
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e5fc378
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e5fc378

Branch: refs/heads/master
Commit: 6e5fc37883ed81c3ee2338145a48de3036d19399
Parents: 689386b
Author: Lianhui Wang <lianhuiwan...@gmail.com>
Authored: Tue Nov 10 10:40:08 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Nov 10 10:40:08 2015 -0800

----------------------------------------------------------------------
 .../spark/deploy/ExternalShuffleService.scala   |  3 +-
 .../apache/spark/network/TransportContext.java  | 11 ++++++-
 .../network/client/TransportClientFactory.java  | 10 ++++++
 .../network/server/TransportChannelHandler.java | 26 +++++++++------
 .../network/TransportClientFactorySuite.java    | 34 ++++++++++++++++++++
 .../network/shuffle/ExternalShuffleClient.java  | 12 ++++---
 6 files changed, 81 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 6840a3a..a039d54 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -47,7 +47,8 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
 
   private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, 
numUsableCores = 0)
   private val blockHandler = newShuffleBlockHandler(transportConf)
-  private val transportContext: TransportContext = new 
TransportContext(transportConf, blockHandler)
+  private val transportContext: TransportContext =
+    new TransportContext(transportConf, blockHandler, true)
 
   private var server: TransportServer = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/TransportContext.java 
b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 43900e6..1b64b86 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -59,15 +59,24 @@ public class TransportContext {
 
   private final TransportConf conf;
   private final RpcHandler rpcHandler;
+  private final boolean closeIdleConnections;
 
   private final MessageEncoder encoder;
   private final MessageDecoder decoder;
 
   public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
+    this(conf, rpcHandler, false);
+  }
+
+  public TransportContext(
+      TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections) {
     this.conf = conf;
     this.rpcHandler = rpcHandler;
     this.encoder = new MessageEncoder();
     this.decoder = new MessageDecoder();
+    this.closeIdleConnections = closeIdleConnections;
   }
 
   /**
@@ -144,7 +153,7 @@ public class TransportContext {
     TransportRequestHandler requestHandler = new 
TransportRequestHandler(channel, client,
       rpcHandler);
     return new TransportChannelHandler(client, responseHandler, requestHandler,
-      conf.connectionTimeoutMs());
+      conf.connectionTimeoutMs(), closeIdleConnections);
   }
 
   public TransportConf getConf() { return conf; }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 4952ffb..42a4f66 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -158,6 +158,16 @@ public class TransportClientFactory implements Closeable {
     }
   }
 
+  /**
+   * Create a completely new {@link TransportClient} to the given remote host 
/ port
+   * But this connection is not pooled.
+   */
+  public TransportClient createUnmanagedClient(String remoteHost, int 
remotePort)
+      throws IOException {
+    final InetSocketAddress address = new InetSocketAddress(remoteHost, 
remotePort);
+    return createClient(address);
+  }
+
   /** Create a completely new {@link TransportClient} to the remote address. */
   private TransportClient createClient(InetSocketAddress address) throws 
IOException {
     logger.debug("Creating new connection to " + address);

http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index 8e0ee70..f8fcd1c 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -55,16 +55,19 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
   private final TransportResponseHandler responseHandler;
   private final TransportRequestHandler requestHandler;
   private final long requestTimeoutNs;
+  private final boolean closeIdleConnections;
 
   public TransportChannelHandler(
       TransportClient client,
       TransportResponseHandler responseHandler,
       TransportRequestHandler requestHandler,
-      long requestTimeoutMs) {
+      long requestTimeoutMs,
+      boolean closeIdleConnections) {
     this.client = client;
     this.responseHandler = responseHandler;
     this.requestHandler = requestHandler;
     this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
+    this.closeIdleConnections = closeIdleConnections;
   }
 
   public TransportClient getClient() {
@@ -111,16 +114,21 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
       IdleStateEvent e = (IdleStateEvent) evt;
       // See class comment for timeout semantics. In addition to ensuring we 
only timeout while
       // there are outstanding requests, we also do a secondary consistency 
check to ensure
-      // there's no race between the idle timeout and incrementing the 
numOutstandingRequests.
-      boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 
0;
+      // there's no race between the idle timeout and incrementing the 
numOutstandingRequests
+      // (see SPARK-7003).
       boolean isActuallyOverdue =
         System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > 
requestTimeoutNs;
-      if (e.state() == IdleState.ALL_IDLE && hasInFlightRequests && 
isActuallyOverdue) {
-        String address = NettyUtils.getRemoteAddress(ctx.channel());
-        logger.error("Connection to {} has been quiet for {} ms while there 
are outstanding " +
-          "requests. Assuming connection is dead; please adjust 
spark.network.timeout if this " +
-          "is wrong.", address, requestTimeoutNs / 1000 / 1000);
-        ctx.close();
+      if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
+        if (responseHandler.numOutstandingRequests() > 0) {
+          String address = NettyUtils.getRemoteAddress(ctx.channel());
+          logger.error("Connection to {} has been quiet for {} ms while there 
are outstanding " +
+            "requests. Assuming connection is dead; please adjust 
spark.network.timeout if this " +
+            "is wrong.", address, requestTimeoutNs / 1000 / 1000);
+          ctx.close();
+        } else if (closeIdleConnections) {
+          // While CloseIdleConnections is enable, we also close idle 
connection
+          ctx.close();
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
 
b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index 35de5e5..f447137 100644
--- 
a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ 
b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -37,6 +38,7 @@ import org.apache.spark.network.client.TransportClientFactory;
 import org.apache.spark.network.server.NoOpRpcHandler;
 import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.util.ConfigProvider;
 import org.apache.spark.network.util.SystemPropertyConfigProvider;
 import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.MapConfigProvider;
@@ -177,4 +179,36 @@ public class TransportClientFactorySuite {
     assertFalse(c1.isActive());
     assertFalse(c2.isActive());
   }
+
+  @Test
+  public void closeIdleConnectionForRequestTimeOut() throws IOException, 
InterruptedException {
+    TransportConf conf = new TransportConf(new ConfigProvider() {
+
+      @Override
+      public String get(String name) {
+        if ("spark.shuffle.io.connectionTimeout".equals(name)) {
+          // We should make sure there is enough time for us to observe the 
channel is active
+          return "1s";
+        }
+        String value = System.getProperty(name);
+        if (value == null) {
+          throw new NoSuchElementException(name);
+        }
+        return value;
+      }
+    });
+    TransportContext context = new TransportContext(conf, new 
NoOpRpcHandler(), true);
+    TransportClientFactory factory = context.createClientFactory();
+    try {
+      TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
+      assertTrue(c1.isActive());
+      long expiredTime = System.currentTimeMillis() + 10000; // 10 seconds
+      while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
+        Thread.sleep(10);
+      }
+      assertFalse(c1.isActive());
+    } finally {
+      factory.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e5fc378/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index ea6d248..ef3a9dc 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -78,7 +78,7 @@ public class ExternalShuffleClient extends ShuffleClient {
   @Override
   public void init(String appId) {
     this.appId = appId;
-    TransportContext context = new TransportContext(conf, new 
NoOpRpcHandler());
+    TransportContext context = new TransportContext(conf, new 
NoOpRpcHandler(), true);
     List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
     if (saslEnabled) {
       bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, 
saslEncryptionEnabled));
@@ -137,9 +137,13 @@ public class ExternalShuffleClient extends ShuffleClient {
       String execId,
       ExecutorShuffleInfo executorInfo) throws IOException {
     checkInit();
-    TransportClient client = clientFactory.createClient(host, port);
-    byte[] registerMessage = new RegisterExecutor(appId, execId, 
executorInfo).toByteArray();
-    client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+    TransportClient client = clientFactory.createUnmanagedClient(host, port);
+    try {
+      byte[] registerMessage = new RegisterExecutor(appId, execId, 
executorInfo).toByteArray();
+      client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+    } finally {
+      client.close();
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to