[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-24 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r529783873



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends 
Watcher[Pod] {
  */
 private[k8s] class LoggingPodStatusWatcherImpl(
 appId: String,
-maybeLoggingInterval: Option[Long])
+maybeLoggingInterval: Option[Long],
+waitForCompletion: Boolean)

Review comment:
   ~Please revert this change. This is inconsistent with Apache Spark 3.1 
and 3.0.~

##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
 val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
 val loggingInterval = if (waitForAppCompletion) 
Some(sparkConf.get(REPORT_INTERVAL)) else None
 
-val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, 
loggingInterval)
+val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId,
+  loggingInterval,
+  waitForAppCompletion)

Review comment:
   ~Please revert this change. This is inconsistent with Apache Spark 3.1 
and 3.0.~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-24 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r529783812



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
 val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
 val loggingInterval = if (waitForAppCompletion) 
Some(sparkConf.get(REPORT_INTERVAL)) else None
 
-val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, 
loggingInterval)
+val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId,
+  loggingInterval,
+  waitForAppCompletion)

Review comment:
   Please revert this change. This is inconsistent with Apache Spark 3.1 
and 3.0.

##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends 
Watcher[Pod] {
  */
 private[k8s] class LoggingPodStatusWatcherImpl(
 appId: String,
-maybeLoggingInterval: Option[Long])
+maybeLoggingInterval: Option[Long],
+waitForCompletion: Boolean)

Review comment:
   Please revert this change. This is inconsistent with Apache Spark 3.1 
and 3.0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523705398



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 }
   }
 
+  override def reset(): Unit = {
+resourceTooOldReceived = false
+  }
+
   override def onClose(e: KubernetesClientException): Unit = {
 logDebug(s"Stopping watching application $appId with last-observed phase 
$phase")
-closeWatch()
+if (e != null && e.getCode==HTTP_GONE) {

Review comment:
   `e.getCode == HTTP_GONE`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523705352



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")

Review comment:
   Oh, it's a compilation error.
   ```scala
   [error] 
/home/jenkins/workspace/SparkPullRequestBuilder@2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala:195:
 object appName is not a member of package conf
   [error] logInfo(s"Waiting for application ${conf.appName} with 
submission ID $sId to finish...")
   [error]  ^
   [error] one error found
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704953



##
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
##
@@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
   loggingPodStatusWatcher,
   KUBERNETES_RESOURCE_PREFIX)
 submissionClient.run()
-verify(loggingPodStatusWatcher).awaitCompletion()
+verify(loggingPodStatusWatcher).watchOrStop("default:driver")

Review comment:
   `default` -> `kubernetesConf.namespace()`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704833



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")
+val interval = maybeLoggingInterval
+
+synchronized {
+  while (!podCompleted && !resourceTooOldReceived) {
+wait(interval.get)
+logInfo(s"Application status for $appId (phase: $phase)")
+  }
+}
+
+if(podCompleted) {
+  logInfo(
+pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
+  .getOrElse("No containers were found in the driver pod."))
+  logInfo(s"Application ${appId} with submission ID $sId finished")
+} else {
+  logInfo(s"Got HTTP Gone code, resource version changed in k8s api. 
Creating a new watcher.")
+}
+
+logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}")
+
+podCompleted
+  } else {
+logInfo(s"Deployed Spark application ${appId} with submission ID $sId into 
Kubernetes")

Review comment:
   Instead of `${appId}`, the other branches seem to use `${conf.appName}`, 
don't they?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704798



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")
+val interval = maybeLoggingInterval
+
+synchronized {
+  while (!podCompleted && !resourceTooOldReceived) {
+wait(interval.get)
+logInfo(s"Application status for $appId (phase: $phase)")
+  }
+}
+
+if(podCompleted) {
+  logInfo(
+pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
+  .getOrElse("No containers were found in the driver pod."))
+  logInfo(s"Application ${appId} with submission ID $sId finished")
+} else {
+  logInfo(s"Got HTTP Gone code, resource version changed in k8s api. 
Creating a new watcher.")
+}
+
+logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}")
+
+podCompleted
+  } else {
+logInfo(s"Deployed Spark application ${appId} with submission ID $sId into 
Kubernetes")
+logInfo(s"It seems we end up here, because we never want to wait for 
completion...")

Review comment:
   Please remote this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704723



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")
+val interval = maybeLoggingInterval
+
+synchronized {
+  while (!podCompleted && !resourceTooOldReceived) {
+wait(interval.get)
+logInfo(s"Application status for $appId (phase: $phase)")
+  }
+}
+
+if(podCompleted) {
+  logInfo(
+pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
+  .getOrElse("No containers were found in the driver pod."))
+  logInfo(s"Application ${appId} with submission ID $sId finished")
+} else {
+  logInfo(s"Got HTTP Gone code, resource version changed in k8s api. 
Creating a new watcher.")

Review comment:
   It seems that we don't have this in the other branches. Please remove 
this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704750



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Waiting for application ${conf.appName} with submission ID $sId 
to finish...")
+val interval = maybeLoggingInterval
+
+synchronized {
+  while (!podCompleted && !resourceTooOldReceived) {
+wait(interval.get)
+logInfo(s"Application status for $appId (phase: $phase)")
+  }
+}
+
+if(podCompleted) {
+  logInfo(
+pod.map { p => s"Container final 
statuses:\n\n${containersDescription(p)}" }
+  .getOrElse("No containers were found in the driver pod."))
+  logInfo(s"Application ${appId} with submission ID $sId finished")
+} else {
+  logInfo(s"Got HTTP Gone code, resource version changed in k8s api. 
Creating a new watcher.")
+}
+
+logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}")

Review comment:
   Please remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704501



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 }
   }
 
+  override def reset(): Unit = {

Review comment:
   Shall we move this to line 69 (after `private def phase`) like the other 
branches?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704394



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -28,8 +29,10 @@ import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.ThreadUtils
 
+

Review comment:
   Let's remove this to minimize diff and to be consistent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704337



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -133,29 +136,37 @@ private[spark] class Client(
   .endVolume()
 .endSpec()
   .build()
-Utils.tryWithResource(
-  kubernetesClient
-.pods()
-.withName(resolvedDriverPod.getMetadata.getName)
-.watch(watcher)) { _ =>
-  val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-  try {
-val otherKubernetesResources =
-  resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
-  } catch {
-case NonFatal(e) =>
-  kubernetesClient.pods().delete(createdDriverPod)
-  throw e
-  }
 
-  if (waitForAppCompletion) {
-logInfo(s"Waiting for application $appName to finish...")
-watcher.awaitCompletion()
-logInfo(s"Application $appName finished.")
-  } else {
-logInfo(s"Deployed Spark application $appName into Kubernetes.")
+val driverPodName = resolvedDriverPod.getMetadata.getName
+var watch: Watch = null
+val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+try {
+  val otherKubernetesResources = 
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+  addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+  kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
+} catch {
+  case NonFatal(e) =>
+kubernetesClient.pods().delete(createdDriverPod)
+throw e
+}
+val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+breakable {
+  while (true) {
+val podWithName = kubernetesClient
+  .pods()
+  .withName(driverPodName)
+
+watcher.reset()
+
+watch = podWithName.watch(watcher)
+
+watcher.eventReceived(Action.MODIFIED, podWithName.get())
+
+if(watcher.watchOrStop(sId)) {
+  logInfo(s"Stop watching as the pod has completed.")

Review comment:
   Let's remove this. Otherwise this exists only at `branch-2.4`. People 
can be confused and think as a regression at branch-3.0 and `master`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704197



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -133,29 +136,37 @@ private[spark] class Client(
   .endVolume()
 .endSpec()
   .build()
-Utils.tryWithResource(
-  kubernetesClient
-.pods()
-.withName(resolvedDriverPod.getMetadata.getName)
-.watch(watcher)) { _ =>
-  val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-  try {
-val otherKubernetesResources =
-  resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
-  } catch {
-case NonFatal(e) =>
-  kubernetesClient.pods().delete(createdDriverPod)
-  throw e
-  }
 
-  if (waitForAppCompletion) {
-logInfo(s"Waiting for application $appName to finish...")
-watcher.awaitCompletion()
-logInfo(s"Application $appName finished.")
-  } else {
-logInfo(s"Deployed Spark application $appName into Kubernetes.")
+val driverPodName = resolvedDriverPod.getMetadata.getName
+var watch: Watch = null
+val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+try {
+  val otherKubernetesResources = 
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+  addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+  kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
+} catch {
+  case NonFatal(e) =>
+kubernetesClient.pods().delete(createdDriverPod)
+throw e
+}
+val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+breakable {
+  while (true) {
+val podWithName = kubernetesClient
+  .pods()
+  .withName(driverPodName)
+
+watcher.reset()
+
+watch = podWithName.watch(watcher)
+
+watcher.eventReceived(Action.MODIFIED, podWithName.get())
+

Review comment:
   Please add the following comment like the other branches.
   ```
   // Break the while loop if the pod is completed or we don't want to wait
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704122



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -133,29 +136,37 @@ private[spark] class Client(
   .endVolume()
 .endSpec()
   .build()
-Utils.tryWithResource(
-  kubernetesClient
-.pods()
-.withName(resolvedDriverPod.getMetadata.getName)
-.watch(watcher)) { _ =>
-  val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-  try {
-val otherKubernetesResources =
-  resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
-  } catch {
-case NonFatal(e) =>
-  kubernetesClient.pods().delete(createdDriverPod)
-  throw e
-  }
 
-  if (waitForAppCompletion) {
-logInfo(s"Waiting for application $appName to finish...")
-watcher.awaitCompletion()
-logInfo(s"Application $appName finished.")
-  } else {
-logInfo(s"Deployed Spark application $appName into Kubernetes.")
+val driverPodName = resolvedDriverPod.getMetadata.getName
+var watch: Watch = null
+val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+try {
+  val otherKubernetesResources = 
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+  addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+  kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
+} catch {
+  case NonFatal(e) =>
+kubernetesClient.pods().delete(createdDriverPod)
+throw e
+}
+val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+breakable {
+  while (true) {
+val podWithName = kubernetesClient
+  .pods()
+  .withName(driverPodName)
+
+watcher.reset()
+

Review comment:
   Let's remove this empty line like the other branch.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704157



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -133,29 +136,37 @@ private[spark] class Client(
   .endVolume()
 .endSpec()
   .build()
-Utils.tryWithResource(
-  kubernetesClient
-.pods()
-.withName(resolvedDriverPod.getMetadata.getName)
-.watch(watcher)) { _ =>
-  val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-  try {
-val otherKubernetesResources =
-  resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
-  } catch {
-case NonFatal(e) =>
-  kubernetesClient.pods().delete(createdDriverPod)
-  throw e
-  }
 
-  if (waitForAppCompletion) {
-logInfo(s"Waiting for application $appName to finish...")
-watcher.awaitCompletion()
-logInfo(s"Application $appName finished.")
-  } else {
-logInfo(s"Deployed Spark application $appName into Kubernetes.")
+val driverPodName = resolvedDriverPod.getMetadata.getName
+var watch: Watch = null
+val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+try {
+  val otherKubernetesResources = 
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+  addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+  kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
+} catch {
+  case NonFatal(e) =>
+kubernetesClient.pods().delete(createdDriverPod)
+throw e
+}
+val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+breakable {
+  while (true) {
+val podWithName = kubernetesClient
+  .pods()
+  .withName(driverPodName)
+
+watcher.reset()
+
+watch = podWithName.watch(watcher)
+

Review comment:
   Shall we add the following comments like the other branches?
   ```
   // Send the latest pod state we know to the watcher to make sure we didn't 
miss anything
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704122



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -133,29 +136,37 @@ private[spark] class Client(
   .endVolume()
 .endSpec()
   .build()
-Utils.tryWithResource(
-  kubernetesClient
-.pods()
-.withName(resolvedDriverPod.getMetadata.getName)
-.watch(watcher)) { _ =>
-  val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-  try {
-val otherKubernetesResources =
-  resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
-  } catch {
-case NonFatal(e) =>
-  kubernetesClient.pods().delete(createdDriverPod)
-  throw e
-  }
 
-  if (waitForAppCompletion) {
-logInfo(s"Waiting for application $appName to finish...")
-watcher.awaitCompletion()
-logInfo(s"Application $appName finished.")
-  } else {
-logInfo(s"Deployed Spark application $appName into Kubernetes.")
+val driverPodName = resolvedDriverPod.getMetadata.getName
+var watch: Watch = null
+val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+try {
+  val otherKubernetesResources = 
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+  addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+  kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
+} catch {
+  case NonFatal(e) =>
+kubernetesClient.pods().delete(createdDriverPod)
+throw e
+}
+val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+breakable {
+  while (true) {
+val podWithName = kubernetesClient
+  .pods()
+  .withName(driverPodName)
+
+watcher.reset()
+

Review comment:
   Let's remove this empty line.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-14 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r523704113



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
##
@@ -133,29 +136,37 @@ private[spark] class Client(
   .endVolume()
 .endSpec()
   .build()
-Utils.tryWithResource(
-  kubernetesClient
-.pods()
-.withName(resolvedDriverPod.getMetadata.getName)
-.watch(watcher)) { _ =>
-  val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-  try {
-val otherKubernetesResources =
-  resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
-  } catch {
-case NonFatal(e) =>
-  kubernetesClient.pods().delete(createdDriverPod)
-  throw e
-  }
 
-  if (waitForAppCompletion) {
-logInfo(s"Waiting for application $appName to finish...")
-watcher.awaitCompletion()
-logInfo(s"Application $appName finished.")
-  } else {
-logInfo(s"Deployed Spark application $appName into Kubernetes.")
+val driverPodName = resolvedDriverPod.getMetadata.getName
+var watch: Watch = null
+val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+try {
+  val otherKubernetesResources = 
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+  addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+  kubernetesClient.resourceList(otherKubernetesResources: 
_*).createOrReplace()
+} catch {
+  case NonFatal(e) =>
+kubernetesClient.pods().delete(createdDriverPod)
+throw e
+}
+val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+breakable {
+  while (true) {
+val podWithName = kubernetesClient
+  .pods()
+  .withName(driverPodName)
+

Review comment:
   Could you add the following comments like the other branches?
   ```
   // Reset resource to old before we start the watch, this is important for 
race conditions
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520197714



##
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
##
@@ -151,6 +151,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter 
{
 createdResourcesArgumentCaptor = 
ArgumentCaptor.forClass(classOf[HasMetadata])
 
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
 when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
+when(loggingPodStatusWatcher.watchOrStop("default" + ":" + 
POD_NAME)).thenReturn(true)

Review comment:
   Maybe, `kubernetesConf.namespace()` instead of `"default"`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520196771



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +187,35 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Patched Sept 8th: Waiting for application" +
+  s" ${appId} with submission ID $sId to finish...")
+val interval = maybeLoggingInterval
+
+synchronized {
+  while (!podCompleted && !resourceTooOldReceived) {
+wait(interval.get)
+logDebug(s"Application status for $appId (phase: $phase)")

Review comment:
   This should be `logInfo`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520196402



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -177,4 +187,35 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
 if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+logInfo(s"Patched Sept 8th: Waiting for application" +

Review comment:
   `Patched Sept 8th:`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520196221



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -134,13 +151,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 }.mkString("")
   }
 
-  override def awaitCompletion(): Unit = {
-podCompletedFuture.await()
-logInfo(pod.map { p =>
-  s"Container final statuses:\n\n${containersDescription(p)}"
-}.getOrElse("No containers were found in the driver pod."))
-  }
-

Review comment:
   This removal looks like a part of independency PR instead of the part of 
SPARK-24266. Could you tell us why this is required and where this came from?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520195743



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 }
   }
 
+  override def reset(): Unit = {
+resourceTooOldReceived = false
+  }
+
   override def onClose(e: KubernetesClientException): Unit = {
-logDebug(s"Stopping watching application $appId with last-observed phase 
$phase")
-closeWatch()
+logInfo(s"Stopping watching application $appId with last-observed phase 
$phase")
+if (e != null && e.getCode==HTTP_GONE) {
+  resourceTooOldReceived = true
+  logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e")
+} else {
+  logInfo(s"Got proper termination code, closing watcher.")

Review comment:
   Please remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520195691



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 }
   }
 
+  override def reset(): Unit = {
+resourceTooOldReceived = false
+  }
+
   override def onClose(e: KubernetesClientException): Unit = {
-logDebug(s"Stopping watching application $appId with last-observed phase 
$phase")
-closeWatch()
+logInfo(s"Stopping watching application $appId with last-observed phase 
$phase")
+if (e != null && e.getCode==HTTP_GONE) {
+  resourceTooOldReceived = true
+  logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e")

Review comment:
   ditto.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520195541



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -77,9 +83,19 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 }
   }
 
+  override def reset(): Unit = {
+resourceTooOldReceived = false
+  }
+
   override def onClose(e: KubernetesClientException): Unit = {
-logDebug(s"Stopping watching application $appId with last-observed phase 
$phase")
-closeWatch()
+logInfo(s"Stopping watching application $appId with last-observed phase 
$phase")

Review comment:
   This is a regression because this is previous `logDebug`. (Of course, 
this is different from `master` branch, too).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520195032



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -42,9 +45,12 @@ private[k8s] trait LoggingPodStatusWatcher extends 
Watcher[Pod] {
  */
 private[k8s] class LoggingPodStatusWatcherImpl(
 appId: String,
-maybeLoggingInterval: Option[Long])
+maybeLoggingInterval: Option[Long],
+waitForCompletion: Boolean)
   extends LoggingPodStatusWatcher with Logging {
 
+  private var resourceTooOldReceived: Boolean = false
+  private var podCompleted = false

Review comment:
   ditto. If possible, shall we follow the order and spacing in `master 
branch`.
   - 
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala#L45-L48





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #30283: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

2020-11-09 Thread GitBox


dongjoon-hyun commented on a change in pull request #30283:
URL: https://github.com/apache/spark/pull/30283#discussion_r520194207



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
##
@@ -28,8 +29,10 @@ import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.ThreadUtils
 
+
 private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
-  def awaitCompletion(): Unit
+  def reset(): Unit
+  def watchOrStop(sId: String): Boolean

Review comment:
   Could you preserve the order from the [original 
patches](https://github.com/apache/spark/pull/28423/files#diff-ff85a8d70a121a39181ed5ddb1005f87bf677acbe0849defc9fc843bcef62df2R31)
 (master/branch-3.0)?
   ```scala
   def watchOrStop(submissionId: String): Boolean
   def reset(): Unit
   ```
   
   The function declaration order is different. The parameter name is not the 
same here (sId != submissionId)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org