This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 77cd133 [SPARK-35174][K8S] Avoid opening watch when waitAppCompletion is false 77cd133 is described below commit 77cd133e966cdbdf5dc2e60f8adeca2f5511a924 Author: Dmytro Melnychenko <dmytro.i...@gmail.com> AuthorDate: Fri Sep 24 11:04:59 2021 -0700 [SPARK-35174][K8S] Avoid opening watch when waitAppCompletion is false ### What changes were proposed in this pull request? Don't open watch when not needed ### Why are the changes needed? In spark-submit, we currently open a pod watch for any spark submission. If WAIT_FOR_APP_COMPLETION is false, we then immediately ignore the result of the watcher and break out of the watcher. When submitting spark applications at scale, this is a source of operational pain, since opening the watch relies on opening a websocket, which tends to run into subtle networking issues around negotiating the websocket connection. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Standard tests Closes #34095 from slothspot/spark-35174. Authored-by: Dmytro Melnychenko <dmytro.i...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../k8s/submit/KubernetesClientApplication.scala | 37 ++++++++++++---------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index e3b80b1..3c3c425 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -151,23 +151,26 @@ private[spark] class Client( kubernetesClient.pods().delete(createdDriverPod) throw e } - val sId = Seq(conf.namespace, driverPodName).mkString(":") - breakable { - while (true) { - val podWithName = kubernetesClient - .pods() - .withName(driverPodName) - // Reset resource to old before we start the watch, this is important for race conditions - watcher.reset() - watch = podWithName.watch(watcher) - - // Send the latest pod state we know to the watcher to make sure we didn't miss anything - watcher.eventReceived(Action.MODIFIED, podWithName.get()) - - // Break the while loop if the pod is completed or we don't want to wait - if(watcher.watchOrStop(sId)) { - watch.close() - break + + if (conf.get(WAIT_FOR_APP_COMPLETION)) { + val sId = Seq(conf.namespace, driverPodName).mkString(":") + breakable { + while (true) { + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + // Reset resource to old before we start the watch, this is important for race conditions + watcher.reset() + watch = podWithName.watch(watcher) + + // Send the latest pod state we know to the watcher to make sure we didn't miss anything + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + // Break the while loop if the pod is completed or we don't want to wait + if (watcher.watchOrStop(sId)) { + watch.close() + break + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org