This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 324275a [SPARK-33185][YARN] Set up yarn.Client to print direct links to driver stdout/stderr 324275a is described below commit 324275ae8350ec15844ce384f40f1ecc4acdc072 Author: Erik Krogen <xkro...@apache.org> AuthorDate: Thu Nov 5 12:38:42 2020 -0600 [SPARK-33185][YARN] Set up yarn.Client to print direct links to driver stdout/stderr ### What changes were proposed in this pull request? Currently when run in `cluster` mode on YARN, the Spark `yarn.Client` will print out the application report into the logs, to be easily viewed by users. For example: ``` INFO yarn.Client: client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: N/A ApplicationMaster host: X.X.X.X ApplicationMaster RPC port: 0 queue: default start time: 1602782566027 final status: UNDEFINED tracking URL: http://hostname:8888/proxy/application_<id>/ user: xkrogen ``` I propose adding, alongside the application report, some additional lines like: ``` Driver Logs (stdout): http://hostname:8042/node/containerlogs/container_<id>/xkrogen/stdout?start=-4096 Driver Logs (stderr): http://hostname:8042/node/containerlogs/container_<id>/xkrogen/stderr?start=-4096 ``` This information isn't contained in the `ApplicationReport`, so it's necessary to query the ResourceManager REST API. For now I have added this as an always-on feature, but if there is any concern about adding this REST dependency, I think hiding this feature behind an off-by-default flag is reasonable. ### Why are the changes needed? Typically, the tracking URL can be used to find the logs of the ApplicationMaster/driver while the application is running. Later, the Spark History Server can be used to track this information down, using the stdout/stderr links on the Executors page. However, in the situation when the driver crashed _before_ writing out a history file, the SHS may not be aware of this application, and thus does not contain links to the driver logs. When this situation arises, it can be difficult for users to debug further, since they can't easily find their driver logs. It is possible to reach the logs by using the `yarn logs` commands, but the average Spark user isn't aware of this and shouldn't have to be. With this information readily available in the logs, users can quickly jump to their driver logs, even if it crashed before the SHS became aware of the application. This has the additional benefit of providing a quick way to access driver logs, which often contain useful information, in a single click (instead of navigating through the Spark UI). ### Does this PR introduce _any_ user-facing change? Yes, some additional print statements will be created in the application report when using YARN in cluster mode. ### How was this patch tested? Added unit tests for the parsing logic in `yarn.ClientSuite`. Also tested against a live cluster. When the driver is running: ``` INFO Client: Application report for application_XXXXXXXXX_YYYYYY (state: RUNNING) INFO Client: client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: N/A ApplicationMaster host: host.example.com ApplicationMaster RPC port: ###### queue: queue_name start time: 1604529046091 final status: UNDEFINED tracking URL: http://host.example.com:8080/proxy/application_XXXXXXXXX_YYYYYY/ user: xkrogen Driver Logs (stdout): http://host.example.com:8042/node/containerlogs/container_e07_XXXXXXXXX_YYYYYY_01_000001/xkrogen/stdout?start=-4096 Driver Logs (stderr): http://host.example.com:8042/node/containerlogs/container_e07_XXXXXXXXX_YYYYYY_01_000001/xkrogen/stderr?start=-4096 INFO Client: Application report for application_XXXXXXXXX_YYYYYY (state: RUNNING) ``` I confirmed that when the driver has not yet launched, the report does not include the two Driver Logs items. Will omit the output here for brevity since it looks the same. Closes #30096 from xkrogen/xkrogen-SPARK-33185-yarn-client-print. Authored-by: Erik Krogen <xkro...@apache.org> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../org/apache/spark/deploy/yarn/Client.scala | 73 ++++++++++++++++++++-- .../org/apache/spark/deploy/yarn/config.scala | 9 +++ .../spark/util/YarnContainerInfoHelper.scala | 14 ++++- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 47 ++++++++++++++ 4 files changed, 134 insertions(+), 9 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 517a4af..30ca4a6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -25,11 +25,16 @@ import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ +import scala.collection.immutable.{Map => IMap} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.control.NonFatal +import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.base.Objects import com.google.common.io.Files +import javax.ws.rs.client.ClientBuilder +import javax.ws.rs.core.MediaType +import javax.ws.rs.core.Response.Status.Family import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission @@ -46,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.security.AMRMTokenIdentifier import org.apache.hadoop.yarn.util.Records +import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.api.python.PythonUtils @@ -58,7 +64,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.rpc.RpcEnv -import org.apache.spark.util.{CallerContext, Utils} +import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper} private[spark] class Client( val args: ClientArguments, @@ -1080,9 +1086,9 @@ private[spark] class Client( // If DEBUG is enabled, log report details every iteration // Otherwise, log them every time the application changes state if (log.isDebugEnabled) { - logDebug(formatReportDetails(report)) + logDebug(formatReportDetails(report, getDriverLogsLink(report.getApplicationId))) } else if (lastState != state) { - logInfo(formatReportDetails(report)) + logInfo(formatReportDetails(report, getDriverLogsLink(report.getApplicationId))) } } @@ -1152,7 +1158,17 @@ private[spark] class Client( appMaster } - private def formatReportDetails(report: ApplicationReport): String = { + /** + * Format an application report and optionally, links to driver logs, in a human-friendly manner. + * + * @param report The application report from YARN. + * @param driverLogsLinks A map of driver log files and their links. Keys are the file names + * (e.g. `stdout`), and values are the links. If empty, nothing will be + * printed. + * @return Human-readable version of the input data. + */ + private def formatReportDetails(report: ApplicationReport, + driverLogsLinks: IMap[String, String]): String = { val details = Seq[(String, String)]( ("client token", getClientToken(report)), ("diagnostics", report.getDiagnostics), @@ -1163,7 +1179,7 @@ private[spark] class Client( ("final status", report.getFinalApplicationStatus.toString), ("tracking URL", report.getTrackingUrl), ("user", report.getUser) - ) + ) ++ driverLogsLinks.map { case (fname, link) => (s"Driver Logs ($fname)", link) } // Use more loggable format if value is null or empty details.map { case (k, v) => @@ -1173,6 +1189,37 @@ private[spark] class Client( } /** + * Fetch links to the logs of the driver for the given application ID. This requires hitting the + * RM REST API. Returns an empty map if the links could not be fetched. If this feature is + * disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], an empty map is returned immediately. + */ + private def getDriverLogsLink(appId: ApplicationId): IMap[String, String] = { + if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)) { + return IMap() + } + try { + val baseRmUrl = WebAppUtils.getRMWebAppURLWithScheme(hadoopConf) + val response = ClientBuilder.newClient() + .target(baseRmUrl) + .path("ws").path("v1").path("cluster").path("apps") + .path(appId.toString).path("appattempts") + .request(MediaType.APPLICATION_JSON) + .get() + response.getStatusInfo.getFamily match { + case Family.SUCCESSFUL => parseAppAttemptsJsonResponse(response.readEntity(classOf[String])) + case _ => + logWarning(s"Unable to fetch app attempts info from $baseRmUrl, got " + + s"status code ${response.getStatus}: ${response.getStatusInfo.getReasonPhrase}") + IMap() + } + } catch { + case e: Exception => + logWarning(s"Unable to get driver log links for $appId", e) + IMap() + } + } + + /** * Submit an application to the ResourceManager. * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive * reporting the application's status until the application has exited for any reason. @@ -1186,7 +1233,7 @@ private[spark] class Client( val report = getApplicationReport(appId) val state = report.getYarnApplicationState logInfo(s"Application report for $appId (state: $state)") - logInfo(formatReportDetails(report)) + logInfo(formatReportDetails(report, getDriverLogsLink(report.getApplicationId))) if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { throw new SparkException(s"Application $appId finished with status: $state") } @@ -1577,6 +1624,20 @@ private object Client extends Logging { writer.flush() out.closeEntry() } + + private[yarn] def parseAppAttemptsJsonResponse(jsonString: String): IMap[String, String] = { + val objectMapper = new ObjectMapper() + // If JSON response is malformed somewhere along the way, MissingNode will be returned, + // which allows for safe continuation of chaining. The `elements()` call will be empty, + // and None will get returned. + objectMapper.readTree(jsonString) + .path("appAttempts").path("appAttempt") + .elements().asScala.toList.takeRight(1).headOption + .map(_.path("logsLink").asText("")) + .filterNot(_ == "") + .map(baseUrl => YarnContainerInfoHelper.getLogUrlsFromBaseUrl(baseUrl)) + .getOrElse(IMap()) + } } private[spark] class YarnClusterApplication extends SparkApplication { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index f2e838f..89a4af2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -189,6 +189,15 @@ package object config extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") + private[spark] val CLIENT_INCLUDE_DRIVER_LOGS_LINK = + ConfigBuilder("spark.yarn.includeDriverLogsLink") + .doc("In cluster mode, whether the client application report includes links to the driver " + + "container's logs. This requires polling the ResourceManager's REST API, so it " + + "places some additional load on the RM.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /* Shared Client-mode AM / Driver configuration. */ private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala index 5e39422..854fe18 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala @@ -28,6 +28,16 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.internal.Logging private[spark] object YarnContainerInfoHelper extends Logging { + + private[this] val DRIVER_LOG_FILE_NAMES = Seq("stdout", "stderr") + private[this] val DRIVER_LOG_START_OFFSET = -4096 + + def getLogUrlsFromBaseUrl(baseUrl: String): Map[String, String] = { + DRIVER_LOG_FILE_NAMES.map { fname => + fname -> s"$baseUrl/$fname?start=$DRIVER_LOG_START_OFFSET" + }.toMap + } + def getLogUrls( conf: Configuration, container: Option[Container]): Option[Map[String, String]] = { @@ -42,9 +52,7 @@ private[spark] object YarnContainerInfoHelper extends Logging { val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" logDebug(s"Base URL for logs: $baseUrl") - Some(Map( - "stdout" -> s"$baseUrl/stdout?start=-4096", - "stderr" -> s"$baseUrl/stderr?start=-4096")) + Some(getLogUrlsFromBaseUrl(baseUrl)) } catch { case e: Exception => logInfo("Error while building executor logs - executor logs will not be available", e) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index ea3acec..fccb240 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -583,6 +583,53 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test("SPARK-33185 Parse YARN AppAttempts valid JSON response") { + val appIdSuffix = "1500000000000_1234567" + val containerId = s"container_e1_${appIdSuffix}_01_000001" + val nodeHost = "node.example.com" + val jsonString = + s""" + |{"appAttempts": { + | "appAttempt": [ { + | "id":1, + | "startTime":1600000000000, + | "finishedTime":1600000100000, + | "containerId":"$containerId", + | "nodeHttpAddress":"$nodeHost:8042", + | "nodeId":"node.example.com:8041", + | "logsLink":"http://$nodeHost:8042/node/containerlogs/$containerId/username", + | "blacklistedNodes":"", + | "nodesBlacklistedBySystem":"", + | "appAttemptId":"appattempt_${appIdSuffix}_000001" + | }] + |}} + |""".stripMargin + val logLinkMap = Client.parseAppAttemptsJsonResponse(jsonString) + assert(logLinkMap.keySet === Set("stdout", "stderr")) + assert(logLinkMap("stdout") === + s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stdout?start=-4096") + assert(logLinkMap("stderr") === + s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stderr?start=-4096") + } + + test("SPARK-33185 Parse YARN AppAttempts invalid JSON response") { + // No "appAttempt" present + assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { } }""") === Map()) + + // "appAttempt" is empty + assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { "appAttempt": [ ] } }""") + === Map()) + + // logsLink is missing + assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"id":1}]}}""") + === Map()) + + // logsLink is present but empty + assert( + Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"logsLink":""}]}}""") + === Map()) + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org