Repository: spark Updated Branches: refs/heads/master dcd1e42d6 -> 2bda1c1d3
[SPARK-5444][Network]Add a retry to deal with the conflict port in netty server. If the `spark.blockMnager.port` had conflicted with a specific port, Spark will throw an exception and exit. So add a retry to avoid this situation. Author: huangzhaowei <carlmartin...@gmail.com> Closes #4240 from SaintBacchus/NettyPortConflict and squashes the following commits: cc926d2 [huangzhaowei] Add a retry to deal with the conflict port in netty server. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bda1c1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bda1c1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bda1c1d Branch: refs/heads/master Commit: 2bda1c1d376afd8abe6a04be345461752f3fb1b6 Parents: dcd1e42 Author: huangzhaowei <carlmartin...@gmail.com> Authored: Fri Feb 6 14:35:29 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Fri Feb 6 14:36:58 2015 -0800 ---------------------------------------------------------------------- .../spark/network/server/TransportServer.java | 36 ++++++++++++++++++-- .../spark/network/util/TransportConf.java | 7 ++++ 2 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2bda1c1d/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 625c325..ef20999 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -100,8 +100,7 @@ public class TransportServer implements Closeable { } }); - channelFuture = bootstrap.bind(new InetSocketAddress(portToBind)); - channelFuture.syncUninterruptibly(); + bindRightPort(portToBind); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port :" + port); @@ -123,4 +122,37 @@ public class TransportServer implements Closeable { bootstrap = null; } + /** + * Attempt to bind to the specified port up to a fixed number of retries. + * If all attempts fail after the max number of retries, exit. + */ + private void bindRightPort(int portToBind) { + int maxPortRetries = conf.portMaxRetries(); + + for (int i = 0; i <= maxPortRetries; i++) { + int tryPort = -1; + if (0 == portToBind) { + // Do not increment port if tryPort is 0, which is treated as a special port + tryPort = 0; + } else { + // If the new port wraps around, do not try a privilege port + tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024; + } + try { + channelFuture = bootstrap.bind(new InetSocketAddress(tryPort)); + channelFuture.syncUninterruptibly(); + return; + } catch (Exception e) { + logger.warn("Netty service could not bind on port " + tryPort + + ". Attempting the next port."); + if (i >= maxPortRetries) { + logger.error(e.getMessage() + ": Netty server failed after " + + maxPortRetries + " retries."); + + // If it can't find a right port, it should exit directly. + System.exit(-1); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2bda1c1d/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 6c91786..2eaf3b7 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -98,4 +98,11 @@ public class TransportConf { public boolean lazyFileDescriptor() { return conf.getBoolean("spark.shuffle.io.lazyFD", true); } + + /** + * Maximum number of retries when binding to a port before giving up. + */ + public int portMaxRetries() { + return conf.getInt("spark.port.maxRetries", 16); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org