Repository: spark
Updated Branches:
  refs/heads/master c84bc40d7 -> 27d3b0a51


[SPARK-25222][K8S] Improve container status logging

## What changes were proposed in this pull request?

Currently when running Spark on Kubernetes a logger is run by the client that 
watches the K8S API for events related to the Driver pod and logs them.  
However for the container status aspect of the logging this simply dumps the 
raw object which is not human readable e.g.

![screen shot 2018-08-24 at 10 37 
46](https://user-images.githubusercontent.com/2104864/44577799-e0486880-a789-11e8-9ae9-fdeddacbbea8.png)
![screen shot 2018-08-24 at 10 38 
14](https://user-images.githubusercontent.com/2104864/44577800-e0e0ff00-a789-11e8-81f5-3bb315dbbdb1.png)

This is despite the fact that the logging class in question actually has 
methods to pretty print this information but only invokes these at the end of a 
job.

This PR improves the logging to always use the pretty printing methods, 
additionally modifying them to include further useful information provided by 
the K8S API.

A similar issue also exists when tasks are lost that will be addressed by 
further commits to this PR

- [x] Improved `LoggingPodStatusWatcher`
- [x] Improved container status on task failure

## How was this patch tested?

Built and launched jobs with the updated Spark client and observed the new 
human readable output:

![screen shot 2018-08-24 at 11 09 
32](https://user-images.githubusercontent.com/2104864/44579429-5353de00-a78e-11e8-9228-c750af8e6311.png)
![screen shot 2018-08-24 at 11 09 
42](https://user-images.githubusercontent.com/2104864/44579430-5353de00-a78e-11e8-8fce-d5bb2a3ae65f.png)
![screen shot 2018-08-24 at 11 10 
13](https://user-images.githubusercontent.com/2104864/44579431-53ec7480-a78e-11e8-9fa2-aeabc5b28ec4.png)
![screen shot 2018-08-24 at 17 47 
44](https://user-images.githubusercontent.com/2104864/44596922-db090f00-a7c5-11e8-910c-bc2339f5a196.png)

Suggested reviewers: liyinan926 mccheah

Author: Rob Vesse <rve...@dotnetrdf.org>

Closes #22215 from rvesse/SPARK-25222.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d3b0a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d3b0a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d3b0a5

Branch: refs/heads/master
Commit: 27d3b0a51cfd1caf05c242b45db9a78ef5868685
Parents: c84bc40
Author: Rob Vesse <rve...@dotnetrdf.org>
Authored: Thu Sep 6 16:15:11 2018 -0700
Committer: mcheah <mch...@palantir.com>
Committed: Thu Sep 6 16:15:11 2018 -0700

----------------------------------------------------------------------
 .../spark/deploy/k8s/KubernetesUtils.scala      | 83 +++++++++++++++++++-
 .../k8s/submit/LoggingPodStatusWatcher.scala    | 73 +----------------
 .../k8s/ExecutorPodsLifecycleManager.scala      |  9 ++-
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala |  9 ++-
 4 files changed, 95 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27d3b0a5/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 588cd9d..f5fae7c 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,7 +16,11 @@
  */
 package org.apache.spark.deploy.k8s
 
-import org.apache.spark.SparkConf
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.util.Utils
 
 private[spark] object KubernetesUtils {
@@ -60,4 +64,81 @@ private[spark] object KubernetesUtils {
   }
 
   def parseMasterUrl(url: String): String = url.substring("k8s://".length)
+
+  def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : 
String = {
+    // Use more loggable format if value is null or empty
+    val indentStr = "\t" * indent
+    pairs.map {
+      case (k, v) => s"\n$indentStr $k: 
${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
+    }.mkString("")
+  }
+
+  /**
+   * Given a pod, output a human readable representation of its state
+   *
+   * @param pod Pod
+   * @return Human readable pod state
+   */
+  def formatPodState(pod: Pod): String = {
+    val details = Seq[(String, String)](
+      // pod metadata
+      ("pod name", pod.getMetadata.getName),
+      ("namespace", pod.getMetadata.getNamespace),
+      ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
+      ("pod uid", pod.getMetadata.getUid),
+      ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
+
+      // spec details
+      ("service account name", pod.getSpec.getServiceAccountName),
+      ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", 
")),
+      ("node name", pod.getSpec.getNodeName),
+
+      // status
+      ("start time", formatTime(pod.getStatus.getStartTime)),
+      ("phase", pod.getStatus.getPhase),
+      ("container status", containersDescription(pod, 2))
+    )
+
+    formatPairsBundle(details)
+  }
+
+  def containersDescription(p: Pod, indent: Int = 1): String = {
+    p.getStatus.getContainerStatuses.asScala.map { status =>
+      Seq(
+        ("container name", status.getName),
+        ("container image", status.getImage)) ++
+        containerStatusDescription(status)
+    }.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
+  }
+
+  def containerStatusDescription(containerStatus: ContainerStatus)
+    : Seq[(String, String)] = {
+    val state = containerStatus.getState
+    Option(state.getRunning)
+      .orElse(Option(state.getTerminated))
+      .orElse(Option(state.getWaiting))
+      .map {
+        case running: ContainerStateRunning =>
+          Seq(
+            ("container state", "running"),
+            ("container started at", formatTime(running.getStartedAt)))
+        case waiting: ContainerStateWaiting =>
+          Seq(
+            ("container state", "waiting"),
+            ("pending reason", waiting.getReason))
+        case terminated: ContainerStateTerminated =>
+          Seq(
+            ("container state", "terminated"),
+            ("container started at", formatTime(terminated.getStartedAt)),
+            ("container finished at", formatTime(terminated.getFinishedAt)),
+            ("exit code", terminated.getExitCode.toString),
+            ("termination reason", terminated.getReason))
+        case unknown =>
+          throw new SparkException(s"Unexpected container status type 
${unknown.getClass}.")
+      }.getOrElse(Seq(("container state", "N/A")))
+  }
+
+  def formatTime(time: Time): String = {
+    if (time != null) time.getTime else "N/A"
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/27d3b0a5/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 173ac54..1889fe5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -25,6 +25,7 @@ import 
io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
 import io.fabric8.kubernetes.client.Watcher.Action
 
 import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.ThreadUtils
 
@@ -99,82 +100,10 @@ private[k8s] class LoggingPodStatusWatcherImpl(
     scheduler.shutdown()
   }
 
-  private def formatPodState(pod: Pod): String = {
-    val details = Seq[(String, String)](
-      // pod metadata
-      ("pod name", pod.getMetadata.getName),
-      ("namespace", pod.getMetadata.getNamespace),
-      ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
-      ("pod uid", pod.getMetadata.getUid),
-      ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
-
-      // spec details
-      ("service account name", pod.getSpec.getServiceAccountName),
-      ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", 
")),
-      ("node name", pod.getSpec.getNodeName),
-
-      // status
-      ("start time", formatTime(pod.getStatus.getStartTime)),
-      ("container images",
-        pod.getStatus.getContainerStatuses
-          .asScala
-          .map(_.getImage)
-          .mkString(", ")),
-      ("phase", pod.getStatus.getPhase),
-      ("status", pod.getStatus.getContainerStatuses.toString)
-    )
-
-    formatPairsBundle(details)
-  }
-
-  private def formatPairsBundle(pairs: Seq[(String, String)]) = {
-    // Use more loggable format if value is null or empty
-    pairs.map {
-      case (k, v) => s"\n\t $k: 
${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
-    }.mkString("")
-  }
-
   override def awaitCompletion(): Unit = {
     podCompletedFuture.await()
     logInfo(pod.map { p =>
       s"Container final statuses:\n\n${containersDescription(p)}"
     }.getOrElse("No containers were found in the driver pod."))
   }
-
-  private def containersDescription(p: Pod): String = {
-    p.getStatus.getContainerStatuses.asScala.map { status =>
-      Seq(
-        ("Container name", status.getName),
-        ("Container image", status.getImage)) ++
-        containerStatusDescription(status)
-    }.map(formatPairsBundle).mkString("\n\n")
-  }
-
-  private def containerStatusDescription(
-      containerStatus: ContainerStatus): Seq[(String, String)] = {
-    val state = containerStatus.getState
-    Option(state.getRunning)
-      .orElse(Option(state.getTerminated))
-      .orElse(Option(state.getWaiting))
-      .map {
-        case running: ContainerStateRunning =>
-          Seq(
-            ("Container state", "Running"),
-            ("Container started at", formatTime(running.getStartedAt)))
-        case waiting: ContainerStateWaiting =>
-          Seq(
-            ("Container state", "Waiting"),
-            ("Pending reason", waiting.getReason))
-        case terminated: ContainerStateTerminated =>
-          Seq(
-            ("Container state", "Terminated"),
-            ("Exit code", terminated.getExitCode.toString))
-        case unknown =>
-          throw new SparkException(s"Unexpected container status type 
${unknown.getClass}.")
-        }.getOrElse(Seq(("Container state", "N/A")))
-  }
-
-  private def formatTime(time: Time): String = {
-    if (time != null) time.getTime else "N/A"
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/27d3b0a5/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index b28d939..e2800cf 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorExited
 import org.apache.spark.util.Utils
@@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager(
 
   private def exitReasonMessage(podState: FinalPodState, execId: Long, 
exitCode: Int) = {
     val pod = podState.pod
+    val reason = Option(pod.getStatus.getReason)
+    val message = Option(pod.getStatus.getMessage)
     s"""
        |The executor with id $execId exited with exit code $exitCode.
-       |The API gave the following brief reason: ${pod.getStatus.getReason}
-       |The API gave the following message: ${pod.getStatus.getMessage}
+       |The API gave the following brief reason: ${reason.getOrElse("N/A")}
+       |The API gave the following message: ${message.getOrElse("N/A")}
        |The API gave the following container statuses:
        |
-       
|${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+       |${containersDescription(pod)}
       """.stripMargin
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27d3b0a5/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 562ace9..d840938 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -31,6 +31,7 @@ import scala.collection.mutable
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.scheduler.ExecutorExited
 import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
 
@@ -104,13 +105,15 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
   }
 
   private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String 
= {
+    val reason = Option(failedPod.getStatus.getReason)
+    val message = Option(failedPod.getStatus.getMessage)
     s"""
        |The executor with id $failedExecutorId exited with exit code 1.
-       |The API gave the following brief reason: 
${failedPod.getStatus.getReason}
-       |The API gave the following message: ${failedPod.getStatus.getMessage}
+       |The API gave the following brief reason: ${reason.getOrElse("N/A")}
+       |The API gave the following message: ${message.getOrElse("N/A")}
        |The API gave the following container statuses:
        |
-       
|${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+       |${containersDescription(failedPod)}
       """.stripMargin
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to