This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c2583  [SPARK-33185][YARN][FOLLOW-ON] Leverage RM's RPC API instead 
of REST to fetch driver log links in yarn.Client
f3c2583 is described below

commit f3c2583cc3ad6a2a24bfb09e2ee7af4e63e5bf66
Author: Erik Krogen <xkro...@apache.org>
AuthorDate: Mon Nov 30 14:40:51 2020 -0600

    [SPARK-33185][YARN][FOLLOW-ON] Leverage RM's RPC API instead of REST to 
fetch driver log links in yarn.Client
    
    ### What changes were proposed in this pull request?
    This is a follow-on to PR #30096 which initially added support for printing 
direct links to the driver stdout/stderr logs from the application report 
output in `yarn.Client` using the `spark.yarn.includeDriverLogsLink` 
configuration. That PR made use of the ResourceManager's REST APIs to fetch the 
necessary information to construct the links. This PR proposes removing the 
dependency on the REST API, since the new logic is the only place in 
`yarn.Client` which makes use of this API, an [...]
    
    ### Why are the changes needed?
    
    While the old logic worked okay when running a Spark application in a 
"standard" environment with full access to Kerberos credentials, it can fail 
when run in an environment with restricted Kerberos credentials. In our case, 
this environment is represented by [Azkaban](https://azkaban.github.io/), but 
it likely affects other job scheduling systems as well. In such an environment, 
the application has delegation tokens which enabled it to communicate with 
services such as YARN, but the  [...]
    
    Besides this enhancement, leveraging the `YarnClient` APIs greatly 
simplifies the processing logic, such as removing all JSON parsing.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Very minimal user-facing changes on top of PR #30096. Basically expands the 
scope of environments in which that feature will operate correctly.
    
    ### How was this patch tested?
    
    In addition to redoing the `spark-submit` testing as mentioned in PR 
#30096, I also tested this logic in a restricted-credentials environment 
(Azkaban). It succeeds where the previous logic would fail with a 401 error.
    
    Closes #30450 from xkrogen/xkrogen-SPARK-33185-driverlogs-followon.
    
    Authored-by: Erik Krogen <xkro...@apache.org>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/deploy/yarn/Client.scala      | 67 ++++++++--------------
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 47 ---------------
 .../spark/deploy/yarn/YarnClusterSuite.scala       | 31 ++++++++++
 3 files changed, 54 insertions(+), 91 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 552167c..d252e83 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -29,12 +29,8 @@ import scala.collection.immutable.{Map => IMap}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, 
Map}
 import scala.util.control.NonFatal
 
-import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.Objects
 import com.google.common.io.Files
-import javax.ws.rs.client.ClientBuilder
-import javax.ws.rs.core.MediaType
-import javax.ws.rs.core.Response.Status.Family
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
@@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 import org.apache.hadoop.yarn.util.Records
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.api.python.PythonUtils
@@ -1089,9 +1084,9 @@ private[spark] class Client(
         // If DEBUG is enabled, log report details every iteration
         // Otherwise, log them every time the application changes state
         if (log.isDebugEnabled) {
-          logDebug(formatReportDetails(report, 
getDriverLogsLink(report.getApplicationId)))
+          logDebug(formatReportDetails(report, getDriverLogsLink(report)))
         } else if (lastState != state) {
-          logInfo(formatReportDetails(report, 
getDriverLogsLink(report.getApplicationId)))
+          logInfo(formatReportDetails(report, getDriverLogsLink(report)))
         }
       }
 
@@ -1192,33 +1187,31 @@ private[spark] class Client(
   }
 
   /**
-   * Fetch links to the logs of the driver for the given application ID. This 
requires hitting the
-   * RM REST API. Returns an empty map if the links could not be fetched. If 
this feature is
-   * disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], an empty map is 
returned immediately.
+   * Fetch links to the logs of the driver for the given application report. 
This requires
+   * query the ResourceManager via RPC. Returns an empty map if the links 
could not be fetched.
+   * If this feature is disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], or 
if the application
+   * report indicates that the driver container isn't currently running, an 
empty map is
+   * returned immediately.
    */
-  private def getDriverLogsLink(appId: ApplicationId): IMap[String, String] = {
-    if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)) {
-      return IMap()
+  private def getDriverLogsLink(appReport: ApplicationReport): IMap[String, 
String] = {
+    if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)
+      || appReport.getYarnApplicationState != YarnApplicationState.RUNNING) {
+      return IMap.empty
     }
     try {
-      val baseRmUrl = WebAppUtils.getRMWebAppURLWithScheme(hadoopConf)
-      val response = ClientBuilder.newClient()
-          .target(baseRmUrl)
-          .path("ws").path("v1").path("cluster").path("apps")
-          .path(appId.toString).path("appattempts")
-          .request(MediaType.APPLICATION_JSON)
-          .get()
-      response.getStatusInfo.getFamily match {
-        case Family.SUCCESSFUL => 
parseAppAttemptsJsonResponse(response.readEntity(classOf[String]))
-        case _ =>
-          logWarning(s"Unable to fetch app attempts info from $baseRmUrl, got "
-              + s"status code ${response.getStatus}: 
${response.getStatusInfo.getReasonPhrase}")
-          IMap()
-      }
+      Option(appReport.getCurrentApplicationAttemptId)
+        .flatMap(attemptId => 
Option(yarnClient.getApplicationAttemptReport(attemptId)))
+        .flatMap(attemptReport => Option(attemptReport.getAMContainerId))
+        .flatMap(amContainerId => 
Option(yarnClient.getContainerReport(amContainerId)))
+        .flatMap(containerReport => Option(containerReport.getLogUrl))
+        .map(YarnContainerInfoHelper.getLogUrlsFromBaseUrl)
+        .getOrElse(IMap.empty)
     } catch {
       case e: Exception =>
-        logWarning(s"Unable to get driver log links for $appId", e)
-        IMap()
+        logWarning(s"Unable to get driver log links for $appId: $e")
+        // Include the full stack trace only at DEBUG level to reduce verbosity
+        logDebug(s"Unable to get driver log links for $appId", e)
+        IMap.empty
     }
   }
 
@@ -1236,7 +1229,7 @@ private[spark] class Client(
       val report = getApplicationReport(appId)
       val state = report.getYarnApplicationState
       logInfo(s"Application report for $appId (state: $state)")
-      logInfo(formatReportDetails(report, 
getDriverLogsLink(report.getApplicationId)))
+      logInfo(formatReportDetails(report, getDriverLogsLink(report)))
       if (state == YarnApplicationState.FAILED || state == 
YarnApplicationState.KILLED) {
         throw new SparkException(s"Application $appId finished with status: 
$state")
       }
@@ -1627,20 +1620,6 @@ private object Client extends Logging {
     writer.flush()
     out.closeEntry()
   }
-
-  private[yarn] def parseAppAttemptsJsonResponse(jsonString: String): 
IMap[String, String] = {
-    val objectMapper = new ObjectMapper()
-    // If JSON response is malformed somewhere along the way, MissingNode will 
be returned,
-    // which allows for safe continuation of chaining. The `elements()` call 
will be empty,
-    // and None will get returned.
-    objectMapper.readTree(jsonString)
-      .path("appAttempts").path("appAttempt")
-      .elements().asScala.toList.takeRight(1).headOption
-      .map(_.path("logsLink").asText(""))
-      .filterNot(_ == "")
-      .map(baseUrl => YarnContainerInfoHelper.getLogUrlsFromBaseUrl(baseUrl))
-      .getOrElse(IMap())
-  }
 }
 
 private[spark] class YarnClusterApplication extends SparkApplication {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index fccb240..ea3acec 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -583,53 +583,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }
   }
 
-  test("SPARK-33185 Parse YARN AppAttempts valid JSON response") {
-    val appIdSuffix = "1500000000000_1234567"
-    val containerId = s"container_e1_${appIdSuffix}_01_000001"
-    val nodeHost = "node.example.com"
-    val jsonString =
-      s"""
-        |{"appAttempts": {
-        |  "appAttempt": [ {
-        |    "id":1,
-        |    "startTime":1600000000000,
-        |    "finishedTime":1600000100000,
-        |    "containerId":"$containerId",
-        |    "nodeHttpAddress":"$nodeHost:8042",
-        |    "nodeId":"node.example.com:8041",
-        |    
"logsLink":"http://$nodeHost:8042/node/containerlogs/$containerId/username";,
-        |    "blacklistedNodes":"",
-        |    "nodesBlacklistedBySystem":"",
-        |    "appAttemptId":"appattempt_${appIdSuffix}_000001"
-        |  }]
-        |}}
-        |""".stripMargin
-    val logLinkMap = Client.parseAppAttemptsJsonResponse(jsonString)
-    assert(logLinkMap.keySet === Set("stdout", "stderr"))
-    assert(logLinkMap("stdout") ===
-        
s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stdout?start=-4096";)
-    assert(logLinkMap("stderr") ===
-        
s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stderr?start=-4096";)
-  }
-
-  test("SPARK-33185 Parse YARN AppAttempts invalid JSON response") {
-    // No "appAttempt" present
-    assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { } }""") 
=== Map())
-
-    // "appAttempt" is empty
-    assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { 
"appAttempt": [ ] } }""")
-        === Map())
-
-    // logsLink is missing
-    
assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"id":1}]}}""")
-        === Map())
-
-    // logsLink is present but empty
-    assert(
-      
Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"logsLink":""}]}}""")
-          === Map())
-  }
-
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index cf754cc..222b24c 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -230,6 +230,37 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     }
   }
 
+  test("running Spark in yarn-cluster mode displays driver log links") {
+    val log4jConf = new File(tempDir, "log4j.properties")
+    val logOutFile = new File(tempDir, "logs")
+    Files.write(
+      s"""log4j.rootCategory=DEBUG,file
+         |log4j.appender.file=org.apache.log4j.FileAppender
+         |log4j.appender.file.file=$logOutFile
+         |log4j.appender.file.layout=org.apache.log4j.PatternLayout
+         |""".stripMargin,
+      log4jConf, StandardCharsets.UTF_8)
+    // Since this test is trying to extract log output from the SparkSubmit 
process itself,
+    // standard options to the Spark process don't take effect. Leverage the 
java-opts file which
+    // will get picked up for the SparkSubmit process.
+    val confDir = new File(tempDir, "conf")
+    confDir.mkdir()
+    val javaOptsFile = new File(confDir, "java-opts")
+    Files.write(s"-Dlog4j.configuration=file://$log4jConf\n", javaOptsFile, 
StandardCharsets.UTF_8)
+
+    val result = File.createTempFile("result", null, tempDir)
+    val finalState = runSpark(clientMode = false,
+      mainClassName(YarnClusterDriver.getClass),
+      appArgs = Seq(result.getAbsolutePath),
+      extraEnv = Map("SPARK_CONF_DIR" -> confDir.getAbsolutePath),
+      extraConf = Map(CLIENT_INCLUDE_DRIVER_LOGS_LINK.key -> true.toString))
+    checkResult(finalState, result)
+    val logOutput = Files.toString(logOutFile, StandardCharsets.UTF_8)
+    val logFilePattern = raw"""(?s).+\sDriver Logs \(<NAME>\): 
https?://.+/<NAME>(\?\S+)?\s.+"""
+    logOutput should fullyMatch regex logFilePattern.replace("<NAME>", 
"stdout")
+    logOutput should fullyMatch regex logFilePattern.replace("<NAME>", 
"stderr")
+  }
+
   test("timeout to get SparkContext in cluster mode triggers failure") {
     val timeout = 2000
     val finalState = runSpark(false, 
mainClassName(SparkContextTimeoutApp.getClass),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to