This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new d2e709e1d [CELEBORN-1882] Support configuring the SSL handshake 
timeout for SSLHandler
d2e709e1d is described below

commit d2e709e1deb2dcc0bc252976d98fecba69f85e26
Author: Minchu Yang <[email protected]>
AuthorDate: Thu Feb 27 15:43:32 2025 -0600

    [CELEBORN-1882] Support configuring the SSL handshake timeout for SSLHandler
    
    ### What changes were proposed in this pull request?
    Support configuring the SSL handshake timeout for SSLHandler, for `rpc_app` 
and `rpc_service` transport modules.
    
    ### Why are the changes needed?
    To make the SSLHandler handshake timeout configurable. We are working on 
ramping shuffle traffic to Celeborn internally, and have observed spark task 
failures which related to the connection timeout. This will make SSLHandler 
handshake timeout in line with our ecosystem’s production config, and should 
minimize those failures and improve robustness.
    
    ### Does this PR introduce _any_ user-facing change?
    Introduces a new server side configuration.
    
    ### How was this patch tested?
    Added a new UT, validated with existing UTs.
    
    Closes #3120 from rmcyang/rmcyang/CELEBORN-1882.
    
    Authored-by: Minchu Yang <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 44d772df75e17cb485d1e4aca086e5fc5b2bd2d6)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../celeborn/common/network/TransportContext.java  |  1 +
 .../network/client/TransportClientFactory.java     |  3 +++
 .../common/network/util/TransportConf.java         |  5 +++++
 .../org/apache/celeborn/common/CelebornConf.scala  | 20 +++++++++++++++++
 .../common/network/util/TransportConfSuiteJ.java   | 26 ++++++++++++++++++++++
 docs/configuration/network.md                      |  1 +
 6 files changed, 56 insertions(+)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index fe6cd5c77..4a60f47da 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -187,6 +187,7 @@ public class TransportContext implements Closeable {
         SslHandler sslHandler;
         try {
           sslHandler = new SslHandler(sslFactory.createSSLEngine(isClient, 
channel.alloc()));
+          sslHandler.setHandshakeTimeoutMillis(conf.sslHandshakeTimeoutMs());
         } catch (Exception e) {
           throw new IllegalStateException("Error creating Netty SslHandler", 
e);
         }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 7b87be40b..a6cf3eef4 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -87,6 +87,7 @@ public class TransportClientFactory implements Closeable {
 
   private final int connectTimeoutMs;
   private final int connectionTimeoutMs;
+  private final int sslHandshakeTimeoutMs;
 
   private final int receiveBuf;
 
@@ -106,6 +107,7 @@ public class TransportClientFactory implements Closeable {
     this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
     this.connectTimeoutMs = conf.connectTimeoutMs();
     this.connectionTimeoutMs = conf.connectionTimeoutMs();
+    this.sslHandshakeTimeoutMs = conf.sslHandshakeTimeoutMs();
     this.receiveBuf = conf.receiveBuf();
     this.sendBuf = conf.sendBuf();
     this.rand = new Random();
@@ -310,6 +312,7 @@ public class TransportClientFactory implements Closeable {
     }
     if (context.sslEncryptionEnabled()) {
       final SslHandler sslHandler = 
cf.channel().pipeline().get(SslHandler.class);
+      sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs);
       Future<Channel> future =
           sslHandler
               .handshakeFuture()
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
 
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
index c7a0b4497..5fb57cbac 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
@@ -225,6 +225,11 @@ public class TransportConf {
     return celebornConf.maxSslEncryptedBlockSize(module);
   }
 
+  /** The timeout in milliseconds for the SSL handshake */
+  public int sslHandshakeTimeoutMs() {
+    return celebornConf.sslHandshakeTimeoutMs(module);
+  }
+
   // suppressing to ensure clarity of code.
   @SuppressWarnings("RedundantIfStatement")
   public boolean sslEnabledAndKeysAreValid() {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b46ddf4f3..bdda07b71 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1356,6 +1356,13 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     getSslConfig(SSL_TRUST_STORE_RELOAD_INTERVAL_MS, module).toInt
   }
 
+  /**
+   * The timeout in milliseconds for the SSL handshake
+   */
+  def sslHandshakeTimeoutMs(module: String): Int = {
+    getSslConfig(SSL_HANDSHAKE_TIMEOUT_MS, module).toInt
+  }
+
   /**
    * Internal config: the max size when chunking the stream with SSL
    */
@@ -5310,6 +5317,19 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val SSL_HANDSHAKE_TIMEOUT_MS: ConfigEntry[Long] =
+    buildConf("celeborn.ssl.<module>.sslHandshakeTimeoutMs")
+      .categories("network", "ssl")
+      .version("0.6.0")
+      .doc("The timeout for the SSL handshake (in milliseconds). The default 
value is set to " +
+        s"the current Netty default. This is applicable for 
`${TransportModuleConstants.RPC_APP_MODULE}` " +
+        s"and `${TransportModuleConstants.RPC_SERVICE_MODULE}` modules")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(
+        p => p > 0 && p <= Int.MaxValue,
+        s"Invalid sslHandshakeTimeoutMs, must be a position number upto 
${Int.MaxValue}")
+      .createWithDefaultString("10s")
+
   val SECRET_REDACTION_PATTERN =
     buildConf("celeborn.redaction.regex")
       .categories("master", "worker")
diff --git 
a/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java
index e03d1b536..b217a1d81 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/network/util/TransportConfSuiteJ.java
@@ -73,6 +73,32 @@ public class TransportConfSuiteJ {
     assertEquals(10000, transportConf.sslTrustStoreReloadIntervalMs());
   }
 
+  @Test
+  public void testSslHandshakeTimeoutMs() {
+    assertEquals(10000, transportConf.sslHandshakeTimeoutMs());
+
+    final String module1 = "rpc_app";
+    final String module2 = "rpc_service";
+
+    final int module1HandshakeTimeoutMs = 20000;
+    final int module2HandshakeTimeoutMs = 30000;
+
+    CelebornConf conf = new CelebornConf();
+
+    conf.set(
+        "celeborn.ssl." + module1 + ".sslHandshakeTimeoutMs",
+        Integer.toString(module1HandshakeTimeoutMs));
+    conf.set(
+        "celeborn.ssl." + module2 + ".sslHandshakeTimeoutMs",
+        Integer.toString(module2HandshakeTimeoutMs));
+
+    TransportConf module1TransportConf = new TransportConf(module1, conf);
+    TransportConf module2TransportConf = new TransportConf(module2, conf);
+
+    assertEquals(module1HandshakeTimeoutMs, 
module1TransportConf.sslHandshakeTimeoutMs());
+    assertEquals(module2HandshakeTimeoutMs, 
module2TransportConf.sslHandshakeTimeoutMs());
+  }
+
   // If a specific key is not set, it should be inherited from celeborn.ssl 
namespace
   @Test
   public void testInheritance() {
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 5b86ae554..cb10b16e4 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -59,6 +59,7 @@ license: |
 | celeborn.ssl.&lt;module&gt;.keyStore | &lt;undefined&gt; | false | Path to 
the key store file.<br/> The path can be absolute or relative to the directory 
in which the process is started. | 0.5.0 |  | 
 | celeborn.ssl.&lt;module&gt;.keyStorePassword | &lt;undefined&gt; | false | 
Password to the key store. | 0.5.0 |  | 
 | celeborn.ssl.&lt;module&gt;.protocol | TLSv1.2 | false | TLS protocol to 
use.<br/> The protocol must be supported by JVM.<br/> The reference list of 
protocols can be found in the "Additional JSSE Standard Names" section of the 
Java security guide. For Java 11, for example, the list can be found 
[here](https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#additional-jsse-standard-names)
 | 0.5.0 |  | 
+| celeborn.ssl.&lt;module&gt;.sslHandshakeTimeoutMs | 10s | false | The 
timeout for the SSL handshake (in milliseconds). The default value is set to 
the current Netty default. This is applicable for `rpc_app` and `rpc_service` 
modules | 0.6.0 |  | 
 | celeborn.ssl.&lt;module&gt;.trustStore | &lt;undefined&gt; | false | Path to 
the trust store file.<br/> The path can be absolute or relative to the 
directory in which the process is started. | 0.5.0 |  | 
 | celeborn.ssl.&lt;module&gt;.trustStorePassword | &lt;undefined&gt; | false | 
Password for the trust store. | 0.5.0 |  | 
 | celeborn.ssl.&lt;module&gt;.trustStoreReloadIntervalMs | 10s | false | The 
interval at which the trust store should be reloaded (in milliseconds), when 
enabled. This setting is mostly only useful for server components, not 
applications. | 0.5.0 |  | 

Reply via email to