This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new ef1441b [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s ef1441b is described below commit ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca Author: Jim Kleckner <j...@cloudphysics.com> AuthorDate: Tue Nov 24 10:20:54 2020 -0800 [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s ### What changes were proposed in this pull request? This patch processes the HTTP Gone event and restarts the pod watcher. ### Why are the changes needed? This is a backport of PR #28423 to branch-2.4. The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually. Closes #30283 from jkleckner/shockdm-2.4.6-spark-submit-fix. Lead-authored-by: Jim Kleckner <j...@cloudphysics.com> Co-authored-by: Dmitriy Drinfeld <dmitriy.drinf...@ibm.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../k8s/submit/KubernetesClientApplication.scala | 62 +++++++++++++--------- .../k8s/submit/LoggingPodStatusWatcher.scala | 56 +++++++++++++++---- .../spark/deploy/k8s/submit/ClientSuite.scala | 4 +- 3 files changed, 87 insertions(+), 35 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa38..cbda8a7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,12 +17,15 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.mutable +import scala.util.control.Breaks._ import scala.util.control.NonFatal import org.apache.spark.SparkConf @@ -133,29 +136,38 @@ private[spark] class Client( .endVolume() .endSpec() .build() - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") - watcher.awaitCompletion() - logInfo(s"Application $appName finished.") - } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") + val driverPodName = resolvedDriverPod.getMetadata.getName + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") + breakable { + while (true) { + val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + // Reset resource to old before we start the watch, this is important for race conditions + watcher.reset() + + watch = podWithName.watch(watcher) + + // Send the latest pod state we know to the watcher to make sure we didn't miss anything + watcher.eventReceived(Action.MODIFIED, podWithName.get()) + + // Break the while loop if the pod is completed or we don't want to wait + if(watcher.watchOrStop(sId)) { + watch.close() + break + } } } } @@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None - val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, + loggingInterval, + waitForAppCompletion) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 4a7d3d4..8f45941 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.k8s.submit +import java.net.HttpURLConnection.HTTP_GONE import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ @@ -29,7 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def awaitCompletion(): Unit + def watchOrStop(submissionId: String): Boolean + def reset(): Unit } /** @@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, - maybeLoggingInterval: Option[Long]) + maybeLoggingInterval: Option[Long], + waitForCompletion: Boolean) extends LoggingPodStatusWatcher with Logging { + private var podCompleted = false + + private var resourceTooOldReceived: Boolean = false + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } @@ -57,6 +66,10 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + override def reset(): Unit = { + resourceTooOldReceived = false + } + def start(): Unit = { maybeLoggingInterval.foreach { interval => scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) @@ -79,7 +92,12 @@ private[k8s] class LoggingPodStatusWatcherImpl( override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + if (e != null && e.getCode == HTTP_GONE) { + resourceTooOldReceived = true + logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") + } else { + closeWatch() + } } private def logShortStatus() = { @@ -97,6 +115,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def closeWatch(): Unit = { podCompletedFuture.countDown() scheduler.shutdown() + podCompleted = true } private def formatPodState(pod: Pod): String = { @@ -134,13 +153,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.mkString("") } - override def awaitCompletion(): Unit = { - podCompletedFuture.await() - logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" - }.getOrElse("No containers were found in the driver pod.")) - } - private def containersDescription(p: Pod): String = { p.getStatus.getContainerStatuses.asScala.map { status => Seq( @@ -177,4 +189,28 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { + logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") + val interval = maybeLoggingInterval + + synchronized { + while (!podCompleted && !resourceTooOldReceived) { + wait(interval.get) + logInfo(s"Application status for $appId (phase: $phase)") + } + } + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") + } + podCompleted + } else { + logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") + // Always act like the application has completed since we don't want to wait for app completion + true + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e791..d997d42 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -151,6 +151,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(loggingPodStatusWatcher.watchOrStop(kubernetesConf.namespace() + ":" + POD_NAME)) + .thenReturn(true) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) @@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { loggingPodStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(loggingPodStatusWatcher).awaitCompletion() + verify(loggingPodStatusWatcher).watchOrStop(kubernetesConf.namespace + ":driver") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org