[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r529783873 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, -maybeLoggingInterval: Option[Long]) +maybeLoggingInterval: Option[Long], +waitForCompletion: Boolean) Review comment: ~Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0.~ ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None -val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) +val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, + loggingInterval, + waitForAppCompletion) Review comment: ~Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0.~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r529783812 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None -val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) +val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, + loggingInterval, + waitForAppCompletion) Review comment: Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0. ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, -maybeLoggingInterval: Option[Long]) +maybeLoggingInterval: Option[Long], +waitForCompletion: Boolean) Review comment: Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523705398 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { +resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") -closeWatch() +if (e != null && e.getCode==HTTP_GONE) { Review comment: `e.getCode == HTTP_GONE`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523705352 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") Review comment: Oh, it's a compilation error. ```scala [error] /home/jenkins/workspace/SparkPullRequestBuilder@2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala:195: object appName is not a member of package conf [error] logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") [error] ^ [error] one error found ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704953 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala ## @@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { loggingPodStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() -verify(loggingPodStatusWatcher).awaitCompletion() +verify(loggingPodStatusWatcher).watchOrStop("default:driver") Review comment: `default` -> `kubernetesConf.namespace()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704833 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") +val interval = maybeLoggingInterval + +synchronized { + while (!podCompleted && !resourceTooOldReceived) { +wait(interval.get) +logInfo(s"Application status for $appId (phase: $phase)") + } +} + +if(podCompleted) { + logInfo( +pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") +} else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") +} + +logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") + +podCompleted + } else { +logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") Review comment: Instead of `${appId}`, the other branches seem to use `${conf.appName}`, don't they? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704798 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") +val interval = maybeLoggingInterval + +synchronized { + while (!podCompleted && !resourceTooOldReceived) { +wait(interval.get) +logInfo(s"Application status for $appId (phase: $phase)") + } +} + +if(podCompleted) { + logInfo( +pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") +} else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") +} + +logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") + +podCompleted + } else { +logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") +logInfo(s"It seems we end up here, because we never want to wait for completion...") Review comment: Please remote this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704723 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") +val interval = maybeLoggingInterval + +synchronized { + while (!podCompleted && !resourceTooOldReceived) { +wait(interval.get) +logInfo(s"Application status for $appId (phase: $phase)") + } +} + +if(podCompleted) { + logInfo( +pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") +} else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") Review comment: It seems that we don't have this in the other branches. Please remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704750 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") +val interval = maybeLoggingInterval + +synchronized { + while (!podCompleted && !resourceTooOldReceived) { +wait(interval.get) +logInfo(s"Application status for $appId (phase: $phase)") + } +} + +if(podCompleted) { + logInfo( +pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") +} else { + logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.") +} + +logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}") Review comment: Please remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704501 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { Review comment: Shall we move this to line 69 (after `private def phase`) like the other branches? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704394 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -28,8 +29,10 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils + Review comment: Let's remove this to minimize diff and to be consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704337 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() -Utils.tryWithResource( - kubernetesClient -.pods() -.withName(resolvedDriverPod.getMetadata.getName) -.watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { -val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) -addDriverOwnerReference(createdDriverPod, otherKubernetesResources) -kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { -case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { -logInfo(s"Waiting for application $appName to finish...") -watcher.awaitCompletion() -logInfo(s"Application $appName finished.") - } else { -logInfo(s"Deployed Spark application $appName into Kubernetes.") +val driverPodName = resolvedDriverPod.getMetadata.getName +var watch: Watch = null +val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) +try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() +} catch { + case NonFatal(e) => +kubernetesClient.pods().delete(createdDriverPod) +throw e +} +val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") +breakable { + while (true) { +val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + +watcher.reset() + +watch = podWithName.watch(watcher) + +watcher.eventReceived(Action.MODIFIED, podWithName.get()) + +if(watcher.watchOrStop(sId)) { + logInfo(s"Stop watching as the pod has completed.") Review comment: Let's remove this. Otherwise this exists only at `branch-2.4`. People can be confused and think as a regression at branch-3.0 and `master`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704197 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() -Utils.tryWithResource( - kubernetesClient -.pods() -.withName(resolvedDriverPod.getMetadata.getName) -.watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { -val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) -addDriverOwnerReference(createdDriverPod, otherKubernetesResources) -kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { -case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { -logInfo(s"Waiting for application $appName to finish...") -watcher.awaitCompletion() -logInfo(s"Application $appName finished.") - } else { -logInfo(s"Deployed Spark application $appName into Kubernetes.") +val driverPodName = resolvedDriverPod.getMetadata.getName +var watch: Watch = null +val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) +try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() +} catch { + case NonFatal(e) => +kubernetesClient.pods().delete(createdDriverPod) +throw e +} +val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") +breakable { + while (true) { +val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + +watcher.reset() + +watch = podWithName.watch(watcher) + +watcher.eventReceived(Action.MODIFIED, podWithName.get()) + Review comment: Please add the following comment like the other branches. ``` // Break the while loop if the pod is completed or we don't want to wait ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704122 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() -Utils.tryWithResource( - kubernetesClient -.pods() -.withName(resolvedDriverPod.getMetadata.getName) -.watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { -val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) -addDriverOwnerReference(createdDriverPod, otherKubernetesResources) -kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { -case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { -logInfo(s"Waiting for application $appName to finish...") -watcher.awaitCompletion() -logInfo(s"Application $appName finished.") - } else { -logInfo(s"Deployed Spark application $appName into Kubernetes.") +val driverPodName = resolvedDriverPod.getMetadata.getName +var watch: Watch = null +val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) +try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() +} catch { + case NonFatal(e) => +kubernetesClient.pods().delete(createdDriverPod) +throw e +} +val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") +breakable { + while (true) { +val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + +watcher.reset() + Review comment: Let's remove this empty line like the other branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704157 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() -Utils.tryWithResource( - kubernetesClient -.pods() -.withName(resolvedDriverPod.getMetadata.getName) -.watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { -val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) -addDriverOwnerReference(createdDriverPod, otherKubernetesResources) -kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { -case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { -logInfo(s"Waiting for application $appName to finish...") -watcher.awaitCompletion() -logInfo(s"Application $appName finished.") - } else { -logInfo(s"Deployed Spark application $appName into Kubernetes.") +val driverPodName = resolvedDriverPod.getMetadata.getName +var watch: Watch = null +val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) +try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() +} catch { + case NonFatal(e) => +kubernetesClient.pods().delete(createdDriverPod) +throw e +} +val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") +breakable { + while (true) { +val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + +watcher.reset() + +watch = podWithName.watch(watcher) + Review comment: Shall we add the following comments like the other branches? ``` // Send the latest pod state we know to the watcher to make sure we didn't miss anything ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704122 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() -Utils.tryWithResource( - kubernetesClient -.pods() -.withName(resolvedDriverPod.getMetadata.getName) -.watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { -val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) -addDriverOwnerReference(createdDriverPod, otherKubernetesResources) -kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { -case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { -logInfo(s"Waiting for application $appName to finish...") -watcher.awaitCompletion() -logInfo(s"Application $appName finished.") - } else { -logInfo(s"Deployed Spark application $appName into Kubernetes.") +val driverPodName = resolvedDriverPod.getMetadata.getName +var watch: Watch = null +val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) +try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() +} catch { + case NonFatal(e) => +kubernetesClient.pods().delete(createdDriverPod) +throw e +} +val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") +breakable { + while (true) { +val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + +watcher.reset() + Review comment: Let's remove this empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r523704113 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala ## @@ -133,29 +136,37 @@ private[spark] class Client( .endVolume() .endSpec() .build() -Utils.tryWithResource( - kubernetesClient -.pods() -.withName(resolvedDriverPod.getMetadata.getName) -.watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { -val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) -addDriverOwnerReference(createdDriverPod, otherKubernetesResources) -kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { -case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { -logInfo(s"Waiting for application $appName to finish...") -watcher.awaitCompletion() -logInfo(s"Application $appName finished.") - } else { -logInfo(s"Deployed Spark application $appName into Kubernetes.") +val driverPodName = resolvedDriverPod.getMetadata.getName +var watch: Watch = null +val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) +try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() +} catch { + case NonFatal(e) => +kubernetesClient.pods().delete(createdDriverPod) +throw e +} +val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") +breakable { + while (true) { +val podWithName = kubernetesClient + .pods() + .withName(driverPodName) + Review comment: Could you add the following comments like the other branches? ``` // Reset resource to old before we start the watch, this is important for race conditions ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520197714 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala ## @@ -151,6 +151,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) +when(loggingPodStatusWatcher.watchOrStop("default" + ":" + POD_NAME)).thenReturn(true) Review comment: Maybe, `kubernetesConf.namespace()` instead of `"default"`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520196771 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +187,35 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Patched Sept 8th: Waiting for application" + + s" ${appId} with submission ID $sId to finish...") +val interval = maybeLoggingInterval + +synchronized { + while (!podCompleted && !resourceTooOldReceived) { +wait(interval.get) +logDebug(s"Application status for $appId (phase: $phase)") Review comment: This should be `logInfo`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520196402 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -177,4 +187,35 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { +logInfo(s"Patched Sept 8th: Waiting for application" + Review comment: `Patched Sept 8th:`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520196221 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -134,13 +151,6 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.mkString("") } - override def awaitCompletion(): Unit = { -podCompletedFuture.await() -logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" -}.getOrElse("No containers were found in the driver pod.")) - } - Review comment: This removal looks like a part of independency PR instead of the part of SPARK-24266. Could you tell us why this is required and where this came from? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520195743 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { +resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { -logDebug(s"Stopping watching application $appId with last-observed phase $phase") -closeWatch() +logInfo(s"Stopping watching application $appId with last-observed phase $phase") +if (e != null && e.getCode==HTTP_GONE) { + resourceTooOldReceived = true + logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") +} else { + logInfo(s"Got proper termination code, closing watcher.") Review comment: Please remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520195691 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { +resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { -logDebug(s"Stopping watching application $appId with last-observed phase $phase") -closeWatch() +logInfo(s"Stopping watching application $appId with last-observed phase $phase") +if (e != null && e.getCode==HTTP_GONE) { + resourceTooOldReceived = true + logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") Review comment: ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520195541 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl( } } + override def reset(): Unit = { +resourceTooOldReceived = false + } + override def onClose(e: KubernetesClientException): Unit = { -logDebug(s"Stopping watching application $appId with last-observed phase $phase") -closeWatch() +logInfo(s"Stopping watching application $appId with last-observed phase $phase") Review comment: This is a regression because this is previous `logDebug`. (Of course, this is different from `master` branch, too). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520195032 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -42,9 +45,12 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { */ private[k8s] class LoggingPodStatusWatcherImpl( appId: String, -maybeLoggingInterval: Option[Long]) +maybeLoggingInterval: Option[Long], +waitForCompletion: Boolean) extends LoggingPodStatusWatcher with Logging { + private var resourceTooOldReceived: Boolean = false + private var podCompleted = false Review comment: ditto. If possible, shall we follow the order and spacing in `master branch`. - https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala#L45-L48 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
dongjoon-hyun commented on a change in pull request #30283: URL: https://github.com/apache/spark/pull/30283#discussion_r520194207 ## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala ## @@ -28,8 +29,10 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils + private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { - def awaitCompletion(): Unit + def reset(): Unit + def watchOrStop(sId: String): Boolean Review comment: Could you preserve the order from the [original patches](https://github.com/apache/spark/pull/28423/files#diff-ff85a8d70a121a39181ed5ddb1005f87bf677acbe0849defc9fc843bcef62df2R31) (master/branch-3.0)? ```scala def watchOrStop(submissionId: String): Boolean def reset(): Unit ``` The function declaration order is different. The parameter name is not the same here (sId != submissionId) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org