This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 84620f2b877 [SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions
84620f2b877 is described below

commit 84620f2b877b9ea52b95343ca46d069a906e28a9
Author: Kent Yao <y...@apache.org>
AuthorDate: Fri Jun 30 18:33:16 2023 +0800

    [SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions
    
    ### What changes were proposed in this pull request?
    
    This PR makes zero when io.connectionTimeout/connectionCreationTimeout is 
negative. Zero here means
    - connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for 
connection establishment
    - connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` 
is disabled.
    
    ### Why are the changes needed?
    
    1. This PR fixes a bug when connectionCreationTimeout is 0, which means 
unlimited to netty, but ChannelFuture.await(0) fails directly and 
inappropriately.
    2. This PR fixes a bug when connectionCreationTimeout is less than 0, which 
causes meaningless transport client reconnections and endless executor 
reconstructions
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new unit tests
    
    Closes #41785 from yaooqinn/SPARK-44241.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit 38645fa470b5af7c2e41efa4fb092bdf2463fbbd)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../network/client/TransportClientFactory.java     | 16 +++++++++--
 .../apache/spark/network/util/TransportConf.java   |  4 +--
 .../client/TransportClientFactorySuite.java        | 33 +++++++++++++++++++---
 3 files changed, 44 insertions(+), 9 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 43408d43e57..188e4ba0f8e 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -249,12 +249,13 @@ public class TransportClientFactory implements Closeable {
     logger.debug("Creating new connection to {}", address);
 
     Bootstrap bootstrap = new Bootstrap();
+    int connCreateTimeout = conf.connectionCreationTimeoutMs();
     bootstrap.group(workerGroup)
       .channel(socketChannelClass)
       // Disable Nagle's Algorithm since we don't want packets to wait
       .option(ChannelOption.TCP_NODELAY, true)
       .option(ChannelOption.SO_KEEPALIVE, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.connectionCreationTimeoutMs())
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
       .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
     if (conf.receiveBuf() > 0) {
@@ -280,10 +281,19 @@ public class TransportClientFactory implements Closeable {
     // Connect to the remote server
     long preConnect = System.nanoTime();
     ChannelFuture cf = bootstrap.connect(address);
-    if (!cf.await(conf.connectionCreationTimeoutMs())) {
+
+    if (connCreateTimeout <= 0) {
+      cf.awaitUninterruptibly();
+      assert cf.isDone();
+      if (cf.isCancelled()) {
+        throw new IOException(String.format("Connecting to %s cancelled", 
address));
+      } else if (!cf.isSuccess()) {
+        throw new IOException(String.format("Failed to connect to %s", 
address), cf.cause());
+      }
+    } else if (!cf.await(connCreateTimeout)) {
       throw new IOException(
         String.format("Connecting to %s timed out (%s ms)",
-          address, conf.connectionCreationTimeoutMs()));
+          address, connCreateTimeout));
     } else if (cf.cause() != null) {
       throw new IOException(String.format("Failed to connect to %s", address), 
cf.cause());
     }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 9dedd5d9849..7c2a408d86d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -103,7 +103,7 @@ public class TransportConf {
       conf.get("spark.network.timeout", "120s"));
     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
       conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS 
+ "s")) * 1000;
-    return (int) defaultTimeoutMs;
+    return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
   }
 
   /** Connect creation timeout in milliseconds. Default 30 secs. */
@@ -111,7 +111,7 @@ public class TransportConf {
     long connectionTimeoutS = 
TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
       conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY,  
connectionTimeoutS + "s")) * 1000;
-    return (int) defaultTimeoutMs;
+    return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
   }
 
   /** Number of concurrent connections between two nodes for fetching data. */
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
index 277ff85db7b..dbbe1540cff 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java
@@ -31,10 +31,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.spark.network.TestUtils;
 import org.apache.spark.network.TransportContext;
 import org.apache.spark.network.server.NoOpRpcHandler;
@@ -45,6 +41,8 @@ import org.apache.spark.network.util.MapConfigProvider;
 import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.TransportConf;
 
+import static org.junit.Assert.*;
+
 public class TransportClientFactorySuite {
   private TransportConf conf;
   private TransportContext context;
@@ -239,4 +237,31 @@ public class TransportClientFactorySuite {
     Assert.assertThrows("fail this connection directly", IOException.class,
       () -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, 
true));
   }
+
+  @Test
+  public void unlimitedConnectionAndCreationTimeouts() throws IOException, 
InterruptedException {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("spark.shuffle.io.connectionTimeout", "-1");
+    configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1");
+    TransportConf conf = new TransportConf("shuffle", new 
MapConfigProvider(configMap));
+    RpcHandler rpcHandler = new NoOpRpcHandler();
+    try (TransportContext ctx = new TransportContext(conf, rpcHandler, true);
+      TransportClientFactory factory = ctx.createClientFactory()){
+      TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), 
server1.getPort());
+      assertTrue(c1.isActive());
+      long expiredTime = System.currentTimeMillis() + 5000;
+      while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
+        Thread.sleep(10);
+      }
+      assertTrue(c1.isActive());
+      // When connectionCreationTimeout is unlimited, the connection shall be 
able to
+      // fail when the server is not reachable.
+      TransportServer server = ctx.createServer();
+      int unreachablePort = server.getPort();
+      JavaUtils.closeQuietly(server);
+      IOException exception = Assert.assertThrows(IOException.class,
+          () -> factory.createClient(TestUtils.getLocalHost(), 
unreachablePort, true));
+      assertNotEquals(exception.getCause(), null);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to