This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0671395 [SPARK-27989][CORE] Added retries on the connection to the driver for k8s 0671395 is described below commit 06713959668c6d8015bd79b79f22cdb5fb5a32e1 Author: Jose Luis Pedrosa <jlpedr...@gmail.com> AuthorDate: Mon Jun 24 09:25:43 2019 -0500 [SPARK-27989][CORE] Added retries on the connection to the driver for k8s Disabled negative dns caching for docker images Improved logging on DNS resolution, convenient for slow k8s clusters ## What changes were proposed in this pull request? Added retries when building the connection to the driver in K8s. In some scenarios DNS reslution can take more than the timeout. Also openjdk-8 by default has negative dns caching enabled, which means even retries may not help depending on the times. ## How was this patch tested? This patch was tested agains an specific k8s cluster with slow response time in DNS to ensure it woks. Closes #24702 from jlpedrosa/feature/kuberetries. Authored-by: Jose Luis Pedrosa <jlpedr...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../spark/network/client/TransportClientFactory.java | 5 +++-- .../spark/executor/CoarseGrainedExecutorBackend.scala | 14 +++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index a8e2715..3bc8729 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -172,10 +172,11 @@ public class TransportClientFactory implements Closeable { final long preResolveHost = System.nanoTime(); final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort); final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000; + final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" : "succeed"; if (hostResolveTimeMs > 2000) { - logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs); + logger.warn("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs); } else { - logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs); + logger.trace("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs); } synchronized (clientPool.locks[clientIndex]) { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2f4fc0e..98e5aa6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -262,7 +262,19 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { executorConf, new SecurityManager(executorConf), clientMode = true) - val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl) + + var driver: RpcEndpointRef = null + val nTries = 3 + for (i <- 0 until nTries if driver == null) { + try { + driver = fetcher.setupEndpointRefByURI(arguments.driverUrl) + } catch { + case e: Throwable => if (i == nTries - 1) { + throw e + } + } + } + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId)) fetcher.shutdown() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org