This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c01152d  [SPARK-23182][CORE] Allow enabling TCP keep alive on the RPC 
connections
c01152d is described below

commit c01152dd22093e9f5d2aa533598e4d4209d30922
Author: Petar Petrov <petar.pet...@leanplum.com>
AuthorDate: Sun Jan 13 13:39:12 2019 -0600

    [SPARK-23182][CORE] Allow enabling TCP keep alive on the RPC connections
    
    ## What changes were proposed in this pull request?
    
    Make it possible for the master to enable TCP keep alive on the RPC 
connections with clients.
    
    ## How was this patch tested?
    
    Manually tested.
    
    Added the following:
    ```
    spark.rpc.io.enableTcpKeepAlive  true
    ```
    to spark-defaults.conf.
    
    Observed the following on the Spark master:
    ```
    $ netstat -town | grep 7077
    tcp6       0      0 10.240.3.134:7077       10.240.1.25:42851       
ESTABLISHED keepalive (6736.50/0/0)
    tcp6       0      0 10.240.3.134:44911      10.240.3.134:7077       
ESTABLISHED keepalive (4098.68/0/0)
    tcp6       0      0 10.240.3.134:7077       10.240.3.134:44911      
ESTABLISHED keepalive (4098.68/0/0)
    ```
    
    Which proves that the keep alive setting is taking effect.
    
    It's currently possible to enable TCP keep alive on the worker / executor, 
but is not possible to configure on other RPC connections. It's unclear to me 
why this could be the case. Keep alive is more important for the master to 
protect it against suddenly departing workers / executors, thus I think it's 
very important to have it. Particularly this makes the master resilient in case 
of using preemptible worker VMs in GCE. GCE has the concept of shutdown 
scripts, which it doesn't guaran [...]
    
    This enables keep-alive on connections besides the master's connections, 
but that shouldn't cause harm.
    
    Closes #20512 from peshopetrov/master.
    
    Authored-by: Petar Petrov <petar.pet...@leanplum.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../java/org/apache/spark/network/server/TransportServer.java  |  4 ++++
 .../main/java/org/apache/spark/network/util/TransportConf.java | 10 ++++++++++
 2 files changed, 14 insertions(+)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index a0ecde2..9b327d5 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -126,6 +126,10 @@ public class TransportServer implements Closeable {
       bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
     }
 
+    if (conf.enableTcpKeepAlive()) {
+      bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+    }
+
     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
       @Override
       protected void initChannel(SocketChannel ch) {
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 89ee5ee..3628da6 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -42,6 +42,7 @@ public class TransportConf {
   private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
   private final String SPARK_NETWORK_IO_LAZYFD_KEY;
   private final String SPARK_NETWORK_VERBOSE_METRICS;
+  private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY;
 
   private final ConfigProvider conf;
 
@@ -64,6 +65,7 @@ public class TransportConf {
     SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
     SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
     SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
+    SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = 
getConfKey("io.enableTcpKeepAlive");
   }
 
   public int getInt(String name, int defaultValue) {
@@ -174,6 +176,14 @@ public class TransportConf {
   }
 
   /**
+   * Whether to enable TCP keep-alive. If true, the TCP keep-alives are 
enabled, which removes
+   * connections that are idle for too long.
+   */
+  public boolean enableTcpKeepAlive() {
+    return conf.getBoolean(SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY, false);
+  }
+
+  /**
    * Maximum number of retries when binding to a port before giving up.
    */
   public int portMaxRetries() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to