This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 02c5b4f [SPARK-28947][K8S] Status logging not happens at an interval for liveness 02c5b4f is described below commit 02c5b4f76337cc3901b8741887292bb4478931f3 Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Tue Oct 15 12:34:39 2019 -0700 [SPARK-28947][K8S] Status logging not happens at an interval for liveness ### What changes were proposed in this pull request? This pr invoke the start method of `LoggingPodStatusWatcherImpl` for status logging at intervals. ### Why are the changes needed? This pr invoke the start method of `LoggingPodStatusWatcherImpl` is declared but never called ### Does this PR introduce any user-facing change? no ### How was this patch tested? manually test Closes #25648 from yaooqinn/SPARK-28947. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../k8s/submit/KubernetesClientApplication.scala | 25 ++------- .../k8s/submit/LoggingPodStatusWatcher.scala | 61 ++++++++++------------ .../spark/deploy/k8s/submit/ClientSuite.scala | 5 +- 3 files changed, 33 insertions(+), 58 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 11bbad9..8e5532d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -86,15 +86,12 @@ private[spark] object ClientArguments { * @param builder Responsible for building the base driver pod based on a composition of * implemented features. * @param kubernetesClient the client to talk to the Kubernetes API server - * @param waitForAppCompletion a flag indicating whether the client should wait for the application - * to complete * @param watcher a watcher that monitors and logs the application status */ private[spark] class Client( conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, kubernetesClient: KubernetesClient, - waitForAppCompletion: Boolean, watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { @@ -124,10 +121,11 @@ private[spark] class Client( .endVolume() .endSpec() .build() + val driverPodName = resolvedDriverPod.getMetadata.getName Utils.tryWithResource( kubernetesClient .pods() - .withName(resolvedDriverPod.getMetadata.getName) + .withName(driverPodName) .watch(watcher)) { _ => val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) try { @@ -141,16 +139,8 @@ private[spark] class Client( throw e } - val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" + - s"${resolvedDriverPod.getMetadata.getName}" - if (waitForAppCompletion) { - logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...") - watcher.awaitCompletion() - logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.") - } else { - logInfo(s"Deployed Spark application ${conf.appName} with " + - s"submission ID ${sId} into Kubernetes.") - } + val sId = Seq(conf.namespace, driverPodName).mkString(":") + watcher.watchOrStop(sId) } } @@ -199,13 +189,11 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { - val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" - val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, kubernetesAppId, @@ -215,9 +203,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) - val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None - - val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, @@ -231,7 +217,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication { kubernetesConf, new KubernetesDriverBuilder(), kubernetesClient, - waitForAppCompletion, watcher) client.run() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index f16d1f3..ce3c80c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -16,49 +16,36 @@ */ package org.apache.spark.deploy.k8s.submit -import java.util.concurrent.{CountDownLatch, TimeUnit} - import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.KubernetesDriverConf import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.internal.Logging -import org.apache.spark.util.ThreadUtils private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def awaitCompletion(): Unit + def watchOrStop(submissionId: String): Unit } /** * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on * every state change and also at an interval for liveness. * - * @param appId application ID. - * @param maybeLoggingInterval ms between each state request. If provided, must be a positive - * number. + * @param conf kubernetes driver conf. */ -private[k8s] class LoggingPodStatusWatcherImpl( - appId: String, - maybeLoggingInterval: Option[Long]) +private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) extends LoggingPodStatusWatcher with Logging { - private val podCompletedFuture = new CountDownLatch(1) - // start timer for periodic logging - private val scheduler = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") - private val logRunnable: Runnable = () => logShortStatus() + private val appId = conf.appId + + private var podCompleted = false private var pod = Option.empty[Pod] private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") - def start(): Unit = { - maybeLoggingInterval.foreach { interval => - scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) - } - } - override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) action match { @@ -78,11 +65,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( closeWatch() } - private def logShortStatus() = { - logInfo(s"Application status for $appId (phase: $phase)") - } - - private def logLongStatus() = { + private def logLongStatus(): Unit = { logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown")) } @@ -90,15 +73,25 @@ private[k8s] class LoggingPodStatusWatcherImpl( phase == "Succeeded" || phase == "Failed" } - private def closeWatch(): Unit = { - podCompletedFuture.countDown() - scheduler.shutdown() + private def closeWatch(): Unit = synchronized { + podCompleted = true + this.notifyAll() } - override def awaitCompletion(): Unit = { - podCompletedFuture.await() - logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" - }.getOrElse("No containers were found in the driver pod.")) + override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) { + logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") + val interval = conf.get(REPORT_INTERVAL) + synchronized { + while (!podCompleted) { + wait(interval) + logInfo(s"Application status for $appId (phase: $phase)") + } + } + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${conf.appName} with submission ID $sId finished") + } else { + logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 2cc7f8e..5d49ac0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -146,7 +146,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kconf, driverBuilder, kubernetesClient, - false, loggingPodStatusWatcher) submissionClient.run() verify(podOperations).create(FULL_EXPECTED_POD) @@ -157,7 +156,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kconf, driverBuilder, kubernetesClient, - false, loggingPodStatusWatcher) submissionClient.run() val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues @@ -181,9 +179,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kconf, driverBuilder, kubernetesClient, - true, loggingPodStatusWatcher) submissionClient.run() - verify(loggingPodStatusWatcher).awaitCompletion() + verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org