This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae5b2a6  [SPARK-26311][CORE] New feature: apply custom log URL pattern 
for executor log URLs in SHS
ae5b2a6 is described below

commit ae5b2a6a92be4986ef5b8062d7fb59318cff6430
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Wed Jan 30 11:52:30 2019 -0800

    [SPARK-26311][CORE] New feature: apply custom log URL pattern for executor 
log URLs in SHS
    
    ## What changes were proposed in this pull request?
    
    This patch proposes adding a new configuration on SHS: custom executor log 
URL pattern. This will enable end users to replace executor logs to other than 
RM provide, like external log service, which enables to serve executor logs 
when NodeManager becomes unavailable in case of YARN.
    
    End users can build their own of custom executor log URLs with pre-defined 
patterns which would be vary on each resource manager. This patch adds some 
patterns to YARN resource manager. (For others, there's even no executor log 
url available so cannot define patterns as well.)
    
    Please refer the doc change as well as added UTs in this patch to see how 
to set up the feature.
    
    ## How was this patch tested?
    
    Added UT, as well as manual test with YARN cluster
    
    Closes #23260 from HeartSaVioR/SPARK-26311.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala |   3 +-
 .../spark/deploy/history/FsHistoryProvider.scala   |   7 +-
 .../deploy/history/HistoryAppStatusStore.scala     | 133 +++++++++++++
 .../executor/CoarseGrainedExecutorBackend.scala    |   9 +-
 .../org/apache/spark/internal/config/History.scala |  18 ++
 .../apache/spark/scheduler/SchedulerBackend.scala  |   7 +
 .../org/apache/spark/scheduler/SparkListener.scala |   3 +-
 .../cluster/CoarseGrainedClusterMessage.scala      |   3 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |   4 +-
 .../spark/scheduler/cluster/ExecutorData.scala     |   5 +-
 .../spark/scheduler/cluster/ExecutorInfo.scala     |  12 +-
 .../scheduler/local/LocalSchedulerBackend.scala    |   3 +-
 .../apache/spark/status/AppStatusListener.scala    |   2 +
 .../scala/org/apache/spark/status/LiveEntity.scala |   4 +-
 .../scala/org/apache/spark/status/api/v1/api.scala |   3 +-
 .../scala/org/apache/spark/util/JsonProtocol.scala |  16 +-
 .../executor_list_json_expectation.json            |   3 +-
 ...ist_with_executor_metrics_json_expectation.json |  24 ++-
 ...utor_process_tree_metrics_json_expectation.json |   6 +-
 .../executor_memory_usage_expectation.json         |  15 +-
 .../executor_node_blacklisting_expectation.json    |  15 +-
 ...de_blacklisting_unblacklisting_expectation.json |  15 +-
 .../spark/ExecutorAllocationManagerSuite.scala     |  16 +-
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |   4 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |   4 +-
 .../deploy/history/FsHistoryProviderSuite.scala    | 209 ++++++++++++++++++++-
 .../scheduler/EventLoggingListenerSuite.scala      |   3 +-
 .../spark/status/AppStatusListenerSuite.scala      |   7 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  |  10 +-
 docs/monitoring.md                                 |  24 +++
 docs/running-on-yarn.md                            |  32 +++-
 project/MimaExcludes.scala                         |   6 +
 .../mesos/MesosFineGrainedSchedulerBackend.scala   |   2 +-
 .../MesosCoarseGrainedSchedulerBackendSuite.scala  |   3 +-
 .../MesosFineGrainedSchedulerBackendSuite.scala    |  15 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala       |  31 ++-
 .../cluster/YarnClusterSchedulerBackend.scala      |  36 +---
 .../spark/util/YarnContainerInfoHelper.scala       | 103 ++++++++++
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  28 ++-
 39 files changed, 717 insertions(+), 126 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f2b79b8..6273601 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2355,7 +2355,8 @@ class SparkContext(config: SparkConf) extends Logging {
     // Note: this code assumes that the task scheduler has been initialized 
and has contacted
     // the cluster manager to get an application ID (in case the cluster 
manager provides one).
     listenerBus.post(SparkListenerApplicationStart(appName, 
Some(applicationId),
-      startTime, sparkUser, applicationAttemptId, 
schedulerBackend.getDriverLogUrls))
+      startTime, sparkUser, applicationAttemptId, 
schedulerBackend.getDriverLogUrls,
+      schedulerBackend.getDriverAttributes))
     _driverLogger.foreach(_.startSync(_hadoopConfiguration))
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 33e89c39..56aea0c 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -359,10 +359,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         return None
     }
 
-    val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, 
secManager, app.info.name,
-      HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
-      attempt.info.startTime.getTime(),
-      attempt.info.appSparkVersion)
+    val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), 
conf, secManager,
+      app.info.name, HistoryServer.getAttemptURI(appId, 
attempt.info.attemptId),
+      attempt.info.startTime.getTime(), attempt.info.appSparkVersion)
     loadPlugins().foreach(_.setupUI(ui))
 
     val loadedUI = LoadedAppUI(ui)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
new file mode 100644
index 0000000..751382c
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.util.matching.Regex
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History._
+import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.api.v1
+import org.apache.spark.util.kvstore.KVStore
+
+private[spark] class HistoryAppStatusStore(
+    conf: SparkConf,
+    store: KVStore)
+  extends AppStatusStore(store, None) with Logging {
+
+  import HistoryAppStatusStore._
+
+  private val logUrlPattern: Option[String] = {
+    val appInfo = super.applicationInfo()
+    val applicationCompleted = appInfo.attempts.nonEmpty && 
appInfo.attempts.head.completed
+    if (applicationCompleted || 
conf.get(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP)) {
+      conf.get(CUSTOM_EXECUTOR_LOG_URL)
+    } else {
+      None
+    }
+  }
+
+  private val informedForMissingAttributes = new AtomicBoolean(false)
+
+  override def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
+    val execList = super.executorList(activeOnly)
+    logUrlPattern match {
+      case Some(pattern) => execList.map(replaceLogUrls(_, pattern))
+      case None => execList
+    }
+  }
+
+  override def executorSummary(executorId: String): v1.ExecutorSummary = {
+    val execSummary = super.executorSummary(executorId)
+    logUrlPattern match {
+      case Some(pattern) => replaceLogUrls(execSummary, pattern)
+      case None => execSummary
+    }
+  }
+
+  private def replaceLogUrls(exec: v1.ExecutorSummary, urlPattern: String): 
v1.ExecutorSummary = {
+    val attributes = exec.attributes
+
+    // Relation between pattern {{FILE_NAME}} and attribute {{LOG_FILES}}
+    // Given that HistoryAppStatusStore don't know which types of log files 
can be provided
+    // from resource manager, we require resource manager to provide available 
types of log
+    // files, which are encouraged to be same as types of log files provided 
in original log URLs.
+    // Once we get the list of log files, we need to expose them to end users 
as a pattern
+    // so that end users can compose custom log URL(s) including log file 
name(s).
+    val allPatterns = 
CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet
+    val allPatternsExceptFileName = allPatterns.filter(_ != "FILE_NAME")
+    val allAttributeKeys = attributes.keySet
+    val allAttributeKeysExceptLogFiles = allAttributeKeys.filter(_ != 
"LOG_FILES")
+
+    if 
(allPatternsExceptFileName.diff(allAttributeKeysExceptLogFiles).nonEmpty) {
+      logFailToRenewLogUrls("some of required attributes are missing in app's 
event log.",
+        allPatternsExceptFileName, allAttributeKeys)
+      return exec
+    } else if (allPatterns.contains("FILE_NAME") && 
!allAttributeKeys.contains("LOG_FILES")) {
+      logFailToRenewLogUrls("'FILE_NAME' parameter is provided, but file 
information is " +
+        "missing in app's event log.", allPatternsExceptFileName, 
allAttributeKeys)
+      return exec
+    }
+
+    val updatedUrl = allPatternsExceptFileName.foldLeft(urlPattern) { case 
(orig, patt) =>
+      // we already checked the existence of attribute when comparing keys
+      orig.replace(s"{{$patt}}", attributes(patt))
+    }
+
+    val newLogUrlMap = if (allPatterns.contains("FILE_NAME")) {
+      // allAttributeKeys should contain "LOG_FILES"
+      attributes("LOG_FILES").split(",").map { file =>
+        file -> updatedUrl.replace("{{FILE_NAME}}", file)
+      }.toMap
+    } else {
+      Map("log" -> updatedUrl)
+    }
+
+    replaceExecutorLogs(exec, newLogUrlMap)
+  }
+
+  private def logFailToRenewLogUrls(
+      reason: String,
+      allPatterns: Set[String],
+      allAttributes: Set[String]): Unit = {
+    if (informedForMissingAttributes.compareAndSet(false, true)) {
+      logInfo(s"Fail to renew executor log urls: $reason. Required: 
$allPatterns / " +
+        s"available: $allAttributes. Falling back to show app's original log 
urls.")
+    }
+  }
+
+  private def replaceExecutorLogs(
+      source: v1.ExecutorSummary,
+      newExecutorLogs: Map[String, String]): v1.ExecutorSummary = {
+    new v1.ExecutorSummary(source.id, source.hostPort, source.isActive, 
source.rddBlocks,
+      source.memoryUsed, source.diskUsed, source.totalCores, source.maxTasks, 
source.activeTasks,
+      source.failedTasks, source.completedTasks, source.totalTasks, 
source.totalDuration,
+      source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
+      source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, 
source.addTime,
+      source.removeTime, source.removeReason, newExecutorLogs, 
source.memoryMetrics,
+      source.blacklistedInStages, source.peakMemoryMetrics, source.attributes)
+  }
+
+}
+
+private[spark] object HistoryAppStatusStore {
+  val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r
+}
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 48d3630..2865c3b 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -60,7 +60,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
-      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, 
extractLogUrls))
+      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, 
extractLogUrls,
+        extractAttributes))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) =>
@@ -76,6 +77,12 @@ private[spark] class CoarseGrainedExecutorBackend(
       .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
   }
 
+  def extractAttributes: Map[String, String] = {
+    val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
+    sys.env.filterKeys(_.startsWith(prefix))
+      .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
+  }
+
   override def receive: PartialFunction[Any, Unit] = {
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala 
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index e7d25bf..1d73f01 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -125,4 +125,22 @@ private[spark] object History {
   val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
     .stringConf
     .createOptional
+
+  val CUSTOM_EXECUTOR_LOG_URL = 
ConfigBuilder("spark.history.custom.executor.log.url")
+    .doc("Specifies custom spark executor log url for supporting external log 
service instead of " +
+      "using cluster managers' application log urls in the history server. 
Spark will support " +
+      "some path variables via patterns which can vary on cluster manager. 
Please check the " +
+      "documentation for your cluster manager to see which patterns are 
supported, if any. " +
+      "This configuration has no effect on a live application, it only affects 
the history server.")
+    .stringConf
+    .createOptional
+
+  val APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP =
+    
ConfigBuilder("spark.history.custom.executor.log.url.applyIncompleteApplication")
+      .doc("Whether to apply custom executor log url, as specified by " +
+        "`spark.history.custom.executor.log.url`, to incomplete application as 
well. " +
+        "Even if this is true, this still only affects the behavior of the 
history server, " +
+        "not running spark applications.")
+      .booleanConf
+      .createWithDefault(true)
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index c187ee1..9159d2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -70,6 +70,13 @@ private[spark] trait SchedulerBackend {
   def getDriverLogUrls: Option[Map[String, String]] = None
 
   /**
+   * Get the attributes on driver. These attributes are used to replace log 
URLs when
+   * custom log url pattern is specified.
+   * @return Map containing attributes on driver.
+   */
+  def getDriverAttributes: Option[Map[String, String]] = None
+
+  /**
    * Get the max number of tasks that can be concurrent launched currently.
    * Note that please don't cache the value returned by this method, because 
the number can change
    * due to add/remove executors.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index e92b8a2..b332c0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -192,7 +192,8 @@ case class SparkListenerApplicationStart(
     time: Long,
     sparkUser: String,
     appAttemptId: Option[String],
-    driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
+    driverLogs: Option[Map[String, String]] = None,
+    driverAttributes: Option[Map[String, String]] = None) extends 
SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 9e768c2..afb48a31 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -63,7 +63,8 @@ private[spark] object CoarseGrainedClusterMessages {
       executorRef: RpcEndpointRef,
       hostname: String,
       cores: Int,
-      logUrls: Map[String, String])
+      logUrls: Map[String, String],
+      attributes: Map[String, String])
     extends CoarseGrainedClusterMessage
 
   case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0111b83..fb28e90 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -183,7 +183,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 
-      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) 
=>
+      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, 
attributes) =>
         if (executorDataMap.contains(executorId)) {
           executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
           context.reply(true)
@@ -207,7 +207,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val data = new ExecutorData(executorRef, executorAddress, hostname,
-            cores, cores, logUrls)
+            cores, cores, logUrls, attributes)
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index b25a4bf..ebe1c1e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -34,5 +34,6 @@ private[cluster] class ExecutorData(
    override val executorHost: String,
    var freeCores: Int,
    override val totalCores: Int,
-   override val logUrlMap: Map[String, String]
-) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
+   override val logUrlMap: Map[String, String],
+   override val attributes: Map[String, String]
+) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index 7f21856..3197e06 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -26,7 +26,12 @@ import org.apache.spark.annotation.DeveloperApi
 class ExecutorInfo(
    val executorHost: String,
    val totalCores: Int,
-   val logUrlMap: Map[String, String]) {
+   val logUrlMap: Map[String, String],
+   val attributes: Map[String, String]) {
+
+  def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, 
String]) = {
+    this(executorHost, totalCores, logUrlMap, Map.empty)
+  }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
 
@@ -35,12 +40,13 @@ class ExecutorInfo(
       (that canEqual this) &&
         executorHost == that.executorHost &&
         totalCores == that.totalCores &&
-        logUrlMap == that.logUrlMap
+        logUrlMap == that.logUrlMap &&
+        attributes == that.attributes
     case _ => false
   }
 
   override def hashCode(): Int = {
-    val state = Seq(executorHost, totalCores, logUrlMap)
+    val state = Seq(executorHost, totalCores, logUrlMap, attributes)
     state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 6ff8bf2..fde2a32 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -129,7 +129,8 @@ private[spark] class LocalSchedulerBackend(
     listenerBus.post(SparkListenerExecutorAdded(
       System.currentTimeMillis,
       executorEndpoint.localExecutorId,
-      new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, 
Map.empty)))
+      new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, 
Map.empty,
+        Map.empty)))
     launcherBackend.setAppId(appId)
     launcherBackend.setState(SparkAppHandle.State.RUNNING)
   }
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 1067cdc..f69c7dd 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -131,6 +131,7 @@ private[spark] class AppStatusListener(
         .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER))
       driver.foreach { d =>
         d.executorLogs = logs.toMap
+        d.attributes = event.driverAttributes.getOrElse(Map.empty).toMap
         update(d, System.nanoTime())
       }
     }
@@ -190,6 +191,7 @@ private[spark] class AppStatusListener(
     exec.totalCores = event.executorInfo.totalCores
     exec.maxTasks = event.executorInfo.totalCores / coresPerTask
     exec.executorLogs = event.executorInfo.logUrlMap
+    exec.attributes = event.executorInfo.attributes
     liveUpdate(exec, System.nanoTime())
   }
 
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 7f7b83a..6d7b34a 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -258,6 +258,7 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   var blacklistedInStages: Set[Int] = TreeSet()
 
   var executorLogs = Map[String, String]()
+  var attributes = Map[String, String]()
 
   // Memory metrics. They may not be recorded (e.g. old event logs) so if 
totalOnHeap is not
   // initialized, the store will not contain this information.
@@ -306,7 +307,8 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
       executorLogs,
       memoryMetrics,
       blacklistedInStages,
-      Some(peakExecutorMetrics).filter(_.isSet))
+      Some(peakExecutorMetrics).filter(_.isSet),
+      attributes)
     new ExecutorSummaryWrapper(info)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 825fc54..60b8bf8 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -106,7 +106,8 @@ class ExecutorSummary private[spark](
     val blacklistedInStages: Set[Int],
     @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
     @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
-    val peakMemoryMetrics: Option[ExecutorMetrics])
+    val peakMemoryMetrics: Option[ExecutorMetrics],
+    val attributes: Map[String, String])
 
 class MemoryMetrics private[spark](
     val usedOnHeapStorageMemory: Long,
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 3370152..d84dd58 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -211,7 +211,8 @@ private[spark] object JsonProtocol {
     ("Timestamp" -> applicationStart.time) ~
     ("User" -> applicationStart.sparkUser) ~
     ("App Attempt ID" -> 
applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
-    ("Driver Logs" -> 
applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
+    ("Driver Logs" -> 
applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) ~
+    ("Driver Attributes" -> 
applicationStart.driverAttributes.map(mapToJson).getOrElse(JNothing))
   }
 
   def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): 
JValue = {
@@ -486,7 +487,8 @@ private[spark] object JsonProtocol {
   def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
     ("Host" -> executorInfo.executorHost) ~
     ("Total Cores" -> executorInfo.totalCores) ~
-    ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
+    ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
+    ("Attributes" -> mapToJson(executorInfo.attributes))
   }
 
   def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
@@ -693,7 +695,9 @@ private[spark] object JsonProtocol {
     val sparkUser = (json \ "User").extract[String]
     val appAttemptId = jsonOption(json \ "App Attempt 
ID").map(_.extract[String])
     val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
-    SparkListenerApplicationStart(appName, appId, time, sparkUser, 
appAttemptId, driverLogs)
+    val driverAttributes = jsonOption(json \ "Driver 
Attributes").map(mapFromJson)
+    SparkListenerApplicationStart(appName, appId, time, sparkUser, 
appAttemptId, driverLogs,
+      driverAttributes)
   }
 
   def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
@@ -1061,7 +1065,11 @@ private[spark] object JsonProtocol {
     val executorHost = (json \ "Host").extract[String]
     val totalCores = (json \ "Total Cores").extract[Int]
     val logUrls = mapFromJson(json \ "Log Urls").toMap
-    new ExecutorInfo(executorHost, totalCores, logUrls)
+    val attributes = jsonOption(json \ "Attributes") match {
+      case Some(attr) => mapFromJson(attr).toMap
+      case None => Map.empty[String, String]
+    }
+    new ExecutorInfo(executorHost, totalCores, logUrls, attributes)
   }
 
   def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index 7bb8fe8..209164e 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -20,5 +20,6 @@
   "maxMemory" : 278302556,
   "addTime" : "2015-02-03T16:43:00.906GMT",
   "executorLogs" : { },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
index 7567477..f282d483 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
@@ -44,7 +44,8 @@
     "ProcessTreePythonRSSMemory": 0,
     "ProcessTreeOtherVMemory": 0,
     "ProcessTreeOtherRSSMemory": 0
-  }
+  },
+  "attributes" : { }
 }, {
   "id" : "7",
   "hostPort" : "node6340.grid.company.com:5933",
@@ -76,7 +77,8 @@
     "totalOnHeapStorageMemory" : 956615884,
     "totalOffHeapStorageMemory" : 0
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "6",
   "hostPort" : "node6644.grid.company.com:8445",
@@ -108,7 +110,8 @@
     "totalOnHeapStorageMemory" : 956615884,
     "totalOffHeapStorageMemory" : 0
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "5",
   "hostPort" : "node2477.grid.company.com:20123",
@@ -140,7 +143,8 @@
     "totalOnHeapStorageMemory" : 956615884,
     "totalOffHeapStorageMemory" : 0
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "4",
   "hostPort" : "node4243.grid.company.com:16084",
@@ -190,7 +194,8 @@
     "ProcessTreePythonRSSMemory": 0,
     "ProcessTreeOtherVMemory": 0,
     "ProcessTreeOtherRSSMemory": 0
-  }
+  },
+  "attributes" : { }
 }, {
   "id" : "3",
   "hostPort" : "node0998.grid.company.com:45265",
@@ -240,7 +245,8 @@
     "ProcessTreePythonRSSMemory": 0,
     "ProcessTreeOtherVMemory": 0,
     "ProcessTreeOtherRSSMemory": 0
-  }
+  },
+  "attributes" : { }
 }, {
   "id" : "2",
   "hostPort" : "node4045.grid.company.com:29262",
@@ -290,7 +296,8 @@
     "ProcessTreePythonRSSMemory": 0,
     "ProcessTreeOtherVMemory": 0,
     "ProcessTreeOtherRSSMemory": 0
-  }
+  },
+  "attributes" : { }
 }, {
   "id" : "1",
   "hostPort" : "node1404.grid.company.com:34043",
@@ -340,5 +347,6 @@
     "ProcessTreePythonRSSMemory": 0,
     "ProcessTreeOtherVMemory": 0,
     "ProcessTreeOtherRSSMemory": 0
-  }
+  },
+  "attributes" : { }
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json
index 69efefe..980ec85 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json
@@ -44,7 +44,8 @@
     "ProcessTreePythonRSSMemory" : 40284160,
     "ProcessTreeOtherVMemory" : 0,
     "ProcessTreeOtherRSSMemory" : 0
-  }
+  },
+  "attributes" : { }
 }, {
   "id" : "9",
   "hostPort" : "rezamemory-2.gce.something.com:40797",
@@ -94,5 +95,6 @@
     "ProcessTreePythonRSSMemory" : 69013504,
     "ProcessTreeOtherVMemory" : 0,
     "ProcessTreeOtherRSSMemory" : 0
-  }
+  },
+  "attributes" : { }
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
index dd5b1dc..c7f0a86 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -26,7 +26,8 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -58,7 +59,8 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 } ,{
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -90,7 +92,8 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -122,7 +125,8 @@
     "totalOnHeapStorageMemory": 384093388,
     "totalOffHeapStorageMemory": 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -154,5 +158,6 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 3e55d3d..02a0ecd 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -26,7 +26,8 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -58,7 +59,8 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -90,7 +92,8 @@
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -122,7 +125,8 @@
     "totalOnHeapStorageMemory": 384093388,
     "totalOffHeapStorageMemory": 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -154,5 +158,6 @@
     "totalOnHeapStorageMemory": 384093388,
     "totalOffHeapStorageMemory": 524288000
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
index e87f3e7..4d31a38 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
@@ -20,7 +20,8 @@
   "maxMemory" : 384093388,
   "addTime" : "2016-11-15T23:20:38.836GMT",
   "executorLogs" : { },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.111:64543",
@@ -46,7 +47,8 @@
     "stdout" : 
"http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr";
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.111:64539",
@@ -72,7 +74,8 @@
     "stdout" : 
"http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr";
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.111:64541",
@@ -98,7 +101,8 @@
     "stdout" : 
"http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stderr";
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.111:64540",
@@ -124,5 +128,6 @@
     "stdout" : 
"http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr";
   },
-  "blacklistedInStages" : [ ]
+  "blacklistedInStages" : [ ],
+  "attributes" : { }
 } ]
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index fdaea28..ce0ef2e 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -201,7 +201,7 @@ class ExecutorAllocationManagerSuite
     // Verify that running a task doesn't affect the target
     post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3)))
     post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 1)
@@ -809,13 +809,13 @@ class ExecutorAllocationManagerSuite
 
     // New executors have registered
     post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 1)
     assert(removeTimes(manager).contains("executor-1"))
     post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty)))
+      0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty, Map.empty)))
     assert(executorIds(manager).size === 2)
     assert(executorIds(manager).contains("executor-2"))
     assert(removeTimes(manager).size === 2)
@@ -842,7 +842,7 @@ class ExecutorAllocationManagerSuite
 
     post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
     post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 0)
@@ -854,7 +854,7 @@ class ExecutorAllocationManagerSuite
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
     post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
 
     assert(executorIds(manager).size === 1)
@@ -862,7 +862,7 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager).size === 0)
 
     post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty)))
+      0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
     assert(executorIds(manager).size === 2)
     assert(executorIds(manager).contains("executor-2"))
     assert(removeTimes(manager).size === 1)
@@ -1112,7 +1112,7 @@ class ExecutorAllocationManagerSuite
     // test setup -- job with 2 tasks, scale up to two executors
     assert(numExecutorsTarget(manager) === 1)
     manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
-      clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, 
Map.empty)))
+      clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, 
Map.empty, Map.empty)))
     
manager.listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0,
 2)))
     clock.advance(1000)
     manager invokePrivate 
_updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
@@ -1120,7 +1120,7 @@ class ExecutorAllocationManagerSuite
     val taskInfo0 = createTaskInfo(0, 0, "executor-1")
     manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo0))
     manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
-      clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, 
Map.empty)))
+      clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, 
Map.empty, Map.empty)))
     val taskInfo1 = createTaskInfo(1, 1, "executor-2")
     manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo1))
     assert(numExecutorsTarget(manager) === 2)
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index dbe187d..061aeb3 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -174,9 +174,9 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", 
dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", 
dummyExecutorEndpoint2)
     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, 
Map.empty))
+      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, 
Map.empty, Map.empty))
     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, 
Map.empty))
+      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, 
Map.empty, Map.empty))
     heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index a6d5066..573a496 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -497,7 +497,7 @@ class StandaloneDynamicAllocationSuite
     val endpointRef = mock(classOf[RpcEndpointRef])
     val mockAddress = mock(classOf[RpcAddress])
     when(endpointRef.address).thenReturn(mockAddress)
-    val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, 
Map.empty)
+    val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, 
Map.empty, Map.empty)
 
     // Get "localhost" on a blacklist.
     val taskScheduler = mock(classOf[TaskSchedulerImpl])
@@ -621,7 +621,7 @@ class StandaloneDynamicAllocationSuite
       val endpointRef = mock(classOf[RpcEndpointRef])
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)
-      val message = RegisterExecutor(id, endpointRef, "localhost", 10, 
Map.empty)
+      val message = RegisterExecutor(id, endpointRef, "localhost", 10, 
Map.empty, Map.empty)
       val backend = 
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askSync[Boolean](message)
     }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 74574e2..bce1733 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -47,8 +47,9 @@ import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.UI.{ADMIN_ACLS, ADMIN_ACLS_GROUPS, 
USER_GROUPS_MAPPING}
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.security.GroupMappingServiceProvider
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.{AppStatusStore, ExecutorSummaryWrapper}
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
 import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 import org.apache.spark.util.logging.DriverLogger
@@ -291,6 +292,192 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     }
   }
 
+  test("log urls without customization") {
+    val conf = createTestConf()
+    val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", 
_))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      execInfo -> execInfo.logUrlMap
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected)
+  }
+
+  test("custom log urls, including FILE_NAME") {
+    val conf = createTestConf()
+      .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = 
true))
+
+    // some of available attributes are not used in pattern which should be OK
+
+    val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", 
_))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      val attr = execInfo.attributes
+      val newLogUrlMap = attr("LOG_FILES").split(",").map { file =>
+        val newLogUrl = getExpectedExecutorLogUrl(attr, Some(file))
+        file -> newLogUrl
+      }.toMap
+
+      execInfo -> newLogUrlMap
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected)
+  }
+
+  test("custom log urls, excluding FILE_NAME") {
+    val conf = createTestConf()
+      .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = 
false))
+
+    // some of available attributes are not used in pattern which should be OK
+
+    val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", 
_))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      val attr = execInfo.attributes
+      val newLogUrl = getExpectedExecutorLogUrl(attr, None)
+
+      execInfo -> Map("log" -> newLogUrl)
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected)
+  }
+
+  test("custom log urls with invalid attribute") {
+    // Here we are referring {{NON_EXISTING}} which is not available in 
attributes,
+    // which Spark will fail back to provide origin log url with warning log.
+
+    val conf = createTestConf()
+      .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = 
true) +
+        "/{{NON_EXISTING}}")
+
+    val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", 
_))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      execInfo -> execInfo.logUrlMap
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected)
+  }
+
+  test("custom log urls, LOG_FILES not available while FILE_NAME is 
specified") {
+    // For this case Spark will fail back to provide origin log url with 
warning log.
+
+    val conf = createTestConf()
+      .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = 
true))
+
+    val executorInfos = (1 to 5).map(
+      createTestExecutorInfo("app1", "user1", _, includingLogFiles = false))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      execInfo -> execInfo.logUrlMap
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected)
+  }
+
+  test("custom log urls, app not finished, applyIncompleteApplication: true") {
+    val conf = createTestConf()
+      .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = 
true))
+      .set(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP, true)
+
+    // ensure custom log urls are applied to incomplete application
+
+    val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", 
_))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      val attr = execInfo.attributes
+      val newLogUrlMap = attr("LOG_FILES").split(",").map { file =>
+        val newLogUrl = getExpectedExecutorLogUrl(attr, Some(file))
+        file -> newLogUrl
+      }.toMap
+
+      execInfo -> newLogUrlMap
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected, isCompletedApp = false)
+  }
+
+  test("custom log urls, app not finished, applyIncompleteApplication: false") 
{
+    val conf = createTestConf()
+      .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = 
true))
+      .set(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP, false)
+
+    // ensure custom log urls are NOT applied to incomplete application
+
+    val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", 
_))
+
+    val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { 
execInfo =>
+      execInfo -> execInfo.logUrlMap
+    }.toMap
+
+    testHandlingExecutorLogUrl(conf, expected, isCompletedApp = false)
+  }
+
+  private def getCustomExecutorLogUrl(includeFileName: Boolean): String = {
+    val baseUrl = 
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}/users/{{USER}}/containers/"; +
+      "{{CONTAINER_ID}}"
+    if (includeFileName) baseUrl + "/{{FILE_NAME}}" else baseUrl
+  }
+
+  private def getExpectedExecutorLogUrl(
+      attributes: Map[String, String],
+      fileName: Option[String]): String = {
+    val baseUrl = 
s"http://newhost:9999/logs/clusters/${attributes("CLUSTER_ID")}" +
+      s"/users/${attributes("USER")}/containers/${attributes("CONTAINER_ID")}"
+
+    fileName match {
+      case Some(file) => baseUrl + s"/$file"
+      case None => baseUrl
+    }
+  }
+
+  private def testHandlingExecutorLogUrl(
+      conf: SparkConf,
+      expectedLogUrlMap: Map[ExecutorInfo, Map[String, String]],
+      isCompletedApp: Boolean = true): Unit = {
+    val provider = new FsHistoryProvider(conf)
+
+    val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
+
+    val executorAddedEvents = expectedLogUrlMap.keys.zipWithIndex.map { case 
(execInfo, idx) =>
+      SparkListenerExecutorAdded(1 + idx, s"exec$idx", execInfo)
+    }.toList.sortBy(_.time)
+    val allEvents = List(SparkListenerApplicationStart("app1", Some("app1"), 
1L,
+      "test", Some("attempt1"))) ++ executorAddedEvents ++
+      (if (isCompletedApp) List(SparkListenerApplicationEnd(1000L)) else Seq())
+
+    writeFile(attempt1, true, None, allEvents: _*)
+
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.size should be (1)
+
+      list.foreach { app =>
+        app.attempts.foreach { attempt =>
+          val appUi = provider.getAppUI(app.id, attempt.attemptId)
+          appUi should not be null
+          val executors = appUi.get.ui.store.executorList(false).iterator
+          executors should not be null
+
+          val iterForExpectation = expectedLogUrlMap.iterator
+
+          var executorCount = 0
+          while (executors.hasNext) {
+            val executor = executors.next()
+            val (expectedExecInfo, expectedLogs) = iterForExpectation.next()
+
+            executor.hostPort should startWith(expectedExecInfo.executorHost)
+            executor.executorLogs should be(expectedLogs)
+
+            executorCount += 1
+          }
+
+          executorCount should be (expectedLogUrlMap.size)
+        }
+      }
+    }
+  }
+
   test("log cleaner") {
     val maxAge = TimeUnit.SECONDS.toMillis(10)
     val clock = new ManualClock(maxAge / 2)
@@ -1048,6 +1235,26 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     conf
   }
 
+  private def createTestExecutorInfo(
+      appId: String,
+      user: String,
+      executorSeqNum: Int,
+      includingLogFiles: Boolean = true): ExecutorInfo = {
+    val host = s"host$executorSeqNum"
+    val container = s"container$executorSeqNum"
+    val cluster = s"cluster$executorSeqNum"
+    val logUrlPrefix = s"http://$host:8888/$appId/$container/origin";
+
+    val executorLogUrlMap = Map("stdout" -> s"$logUrlPrefix/stdout",
+      "stderr" -> s"$logUrlPrefix/stderr")
+
+    val extraAttributes = if (includingLogFiles) Map("LOG_FILES" -> 
"stdout,stderr") else Map.empty
+    val executorAttributes = Map("CONTAINER_ID" -> container, "CLUSTER_ID" -> 
cluster,
+      "USER" -> user) ++ extraAttributes
+
+    new ExecutorInfo(host, 1, executorLogUrlMap, executorAttributes)
+  }
+
   private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
     extends FsHistoryProvider(conf, clock) {
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 811b9fe..40521f0 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -422,7 +422,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
   }
 
   private def createExecutorAddedEvent(executorId: Int) = {
-    SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
+    SparkListenerExecutorAdded(0L, executorId.toString,
+      new ExecutorInfo("host1", 1, Map.empty, Map.empty))
   }
 
   private def createExecutorRemovedEvent(executorId: Int) = {
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index ede9466..356e6d1 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -135,7 +135,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     execIds.foreach { id =>
       listener.onExecutorAdded(SparkListenerExecutorAdded(time, id,
-        new ExecutorInfo(s"$id.example.com", 1, Map())))
+        new ExecutorInfo(s"$id.example.com", 1, Map.empty, Map.empty)))
     }
 
     execIds.foreach { id =>
@@ -685,7 +685,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val bm2 = BlockManagerId("2", "2.example.com", 84)
     Seq(bm1, bm2).foreach { bm =>
       listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
-        new ExecutorInfo(bm.host, 1, Map())))
+        new ExecutorInfo(bm.host, 1, Map.empty, Map.empty)))
       listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, 
maxMemory))
       check[ExecutorSummaryWrapper](bm.executorId) { exec =>
         assert(exec.info.maxMemory === maxMemory)
@@ -1553,7 +1553,8 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   /** Create an executor added event for the specified executor Id. */
   private def createExecutorAddedEvent(executorId: Int) = {
-    SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
+    SparkListenerExecutorAdded(0L, executorId.toString,
+      new ExecutorInfo("host1", 1, Map.empty, Map.empty))
   }
 
   /** Create an executor added event for the specified executor Id. */
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c63f04d..c3ff379 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -76,13 +76,14 @@ class JsonProtocolSuite extends SparkFunSuite {
       BlockManagerId("Scarce", "to be counted...", 100))
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
+    val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap
     val applicationStart = SparkListenerApplicationStart("The winner of all", 
Some("appId"),
       42L, "Garfield", Some("appAttempt"))
     val applicationStartWithLogs = SparkListenerApplicationStart("The winner 
of all", Some("appId"),
       42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
     val applicationEnd = SparkListenerApplicationEnd(42L)
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
-      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
+      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, 
"exec2", "test reason")
     val executorBlacklisted = 
SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
     val executorUnblacklisted =
@@ -138,13 +139,14 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("Dependent Classes") {
     val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
+    val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
     testTaskMetrics(makeTaskMetrics(
       33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput 
= false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
-    testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
+    testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, attributes))
 
     // StorageLevel
     testStorageLevel(StorageLevel.NONE)
@@ -1858,6 +1860,10 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Log Urls" : {
       |      "stderr" : "mystderr",
       |      "stdout" : "mystdout"
+      |    },
+      |    "Attributes" : {
+      |      "ContainerId" : "ct1",
+      |      "User" : "spark"
       |    }
       |  }
       |}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6bb620a..1e292d2 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -248,6 +248,30 @@ Security options for the Spark History Server are covered 
more detail in the
         written to disk will be re-used in the event of a history server 
restart.
     </td>
   </tr>
+  <tr>
+    <td>spark.history.custom.executor.log.url</td>
+    <td>(none)</td>
+    <td>
+        Specifies custom spark executor log URL for supporting external log 
service instead of using cluster
+        managers' application log URLs in the history server. Spark will 
support some path variables via patterns
+        which can vary on cluster manager. Please check the documentation for 
your cluster manager to
+        see which patterns are supported, if any. This configuration has no 
effect on a live application, it only
+        affects the history server.
+        <p/>
+        For now, only YARN mode supports this configuration
+    </td>
+  </tr>
+  <tr>
+    <td>spark.history.custom.executor.log.url.applyIncompleteApplication</td>
+    <td>false</td>
+    <td>
+        Specifies whether to apply custom spark executor log URL to incomplete 
applications as well.
+        If executor logs for running applications should be provided as origin 
log URLs, set this to `false`.
+        Please note that incomplete applications may include applications 
which didn't shutdown gracefully.
+        Even this is set to `true`, this configuration has no effect on a live 
application, it only affects the history server.
+    </td>
+  </tr>
+
 </table>
 
 Note that in all of these UIs, the tables are sortable by clicking their 
headers,
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index a7a448f..c3ebadb 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -472,6 +472,36 @@ To use a custom metrics.properties for the application 
master and executors, upd
 </tr>
 </table>
 
+#### Available patterns for SHS custom executor log URL
+
+<table class="table">
+    <tr><th>Pattern</th><th>Meaning</th></tr>
+    <tr>
+      <td>{{HTTP_SCHEME}}</td>
+      <td>`http://` or `https://` according to YARN HTTP policy. (Configured 
via `yarn.http.policy`)</td>
+    </tr>
+    <tr>
+      <td>{{NM_HTTP_ADDRESS}}</td>
+      <td>Http URI of the node on which the container is allocated.</td>
+    </tr>
+    <tr>
+      <td>{{CLUSTER_ID}}</td>
+      <td>The cluster ID of Resource Manager. (Configured via 
`yarn.resourcemanager.cluster-id`)</td>
+    </tr>
+    <tr>
+      <td>{{CONTAINER_ID}}</td>
+      <td>The ID of container.</td>
+    </tr>
+    <tr>
+      <td>{{USER}}</td>
+      <td>'SPARK_USER' on system environment.</td>
+    </tr>
+    <tr>
+      <td>{{FILE_NAME}}</td>
+      <td>`stdout`, `stderr`.</td>
+    </tr>
+</table>
+
 # Important notes
 
 - Whether core requests are honored in scheduling decisions depends on which 
scheduler is in use and how it is configured.
@@ -659,4 +689,4 @@ do the following:
 - On the Spark History Server, add 
<code>org.apache.spark.deploy.yarn.YarnProxyRedirectFilter</code>
   to the list of filters in the <code>spark.ui.filters</code> configuration.
 
-Be aware that the history server information may not be up-to-date with the 
application's state.
+Be aware that the history server information may not be up-to-date with the 
application's state.
\ No newline at end of file
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a13ee51..5d5d01f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -39,6 +39,12 @@ object MimaExcludes {
     // [SPARK-26254][CORE] Extract Hive + Kafka dependencies from Core.
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.deploy.security.HiveDelegationTokenProvider"),
 
+    // [SPARK-26311][CORE]New feature: apply custom log URL pattern for 
executor log URLs
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"),
+    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"),
+
     // [SPARK-25765][ML] Add training cost to BisectingKMeans summary
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel.this"),
 
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 192f940..c5c7842 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -328,7 +328,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
         slaveIdToWorkerOffer.get(slaveId).foreach(o =>
           
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
             // TODO: Add support for log urls for Mesos
-            new ExecutorInfo(o.host, o.cores, Map.empty)))
+            new ExecutorInfo(o.host, o.cores, Map.empty, Map.empty)))
         )
         logTrace(s"Launching Mesos tasks on slave '$slaveId', 
tasks:\n${getTasksSummary(tasks)}")
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), 
tasks, filters)
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f2fd5e6..a4e9cc3 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -687,7 +687,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
   private def registerMockExecutor(executorId: String, slaveId: String, cores: 
Integer) = {
     val mockEndpointRef = mock[RpcEndpointRef]
     val mockAddress = mock[RpcAddress]
-    val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, 
cores, Map.empty)
+    val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, 
cores, Map.empty,
+      Map.empty)
 
     backend.driverEndpoint.askSync[Boolean](message)
   }
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index c9b7e6c..0b6d93f 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -86,7 +86,8 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val listenerBus = mock[LiveListenerBus]
     listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+      SparkListenerExecutorAdded(anyLong, "s1",
+        new ExecutorInfo("host1", 2, Map.empty, Map.empty)))
 
     val sc = mock[SparkContext]
     when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
@@ -117,7 +118,8 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val listenerBus = mock[LiveListenerBus]
     listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+      SparkListenerExecutorAdded(anyLong, "s1",
+        new ExecutorInfo("host1", 2, Map.empty, Map.empty)))
 
     val sc = mock[SparkContext]
     when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
@@ -157,7 +159,8 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val listenerBus = mock[LiveListenerBus]
     listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+      SparkListenerExecutorAdded(anyLong, "s1",
+        new ExecutorInfo("host1", 2, Map.empty, Map.empty)))
 
     val sc = mock[SparkContext]
     when(sc.executorMemory).thenReturn(100)
@@ -217,7 +220,8 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val listenerBus = mock[LiveListenerBus]
     listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+      SparkListenerExecutorAdded(anyLong, "s1",
+        new ExecutorInfo("host1", 2, Map.empty, Map.empty)))
 
     val sc = mock[SparkContext]
     when(sc.executorMemory).thenReturn(100)
@@ -308,7 +312,8 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val listenerBus = mock[LiveListenerBus]
     listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
+      SparkListenerExecutorAdded(anyLong, "s1",
+        new ExecutorInfo("host1", 2, Map.empty, Map.empty)))
 
     val sc = mock[SparkContext]
     when(sc.executorMemory).thenReturn(100)
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 77ce2f6..0b909d1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -19,11 +19,12 @@ package org.apache.spark.deploy.yarn
 
 import java.io.File
 import java.nio.ByteBuffer
-import java.util.Collections
+import java.util.{Collections, Locale}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
 
+import org.apache.hadoop.HadoopIllegalArgumentException
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
@@ -33,13 +34,13 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.NMClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, YarnContainerInfoHelper}
 
 private[yarn] class ExecutorRunnable(
     container: Option[Container],
@@ -220,13 +221,6 @@ private[yarn] class ExecutorRunnable(
     val env = new HashMap[String, String]()
     Client.populateClasspath(null, conf, sparkConf, env, 
sparkConf.get(EXECUTOR_CLASS_PATH))
 
-    // lookup appropriate http scheme for container log urls
-    val yarnHttpPolicy = conf.get(
-      YarnConfiguration.YARN_HTTP_POLICY_KEY,
-      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
-    )
-    val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://"; else 
"http://";
-
     System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
       .foreach { case (k, v) => env(k) = v }
 
@@ -241,15 +235,18 @@ private[yarn] class ExecutorRunnable(
       }
     }
 
-    // Add log urls
+    // Add log urls, as well as executor attributes
     container.foreach { c =>
-      sys.env.filterKeys(_.endsWith("USER")).foreach { user =>
-        val containerId = ConverterUtils.toString(c.getId)
-        val address = c.getNodeHttpAddress
-        val baseUrl = 
s"$httpScheme$address/node/containerlogs/$containerId/$user"
+      YarnContainerInfoHelper.getLogUrls(conf, Some(c)).foreach { m =>
+        m.foreach { case (fileName, url) =>
+          env("SPARK_LOG_URL_" + fileName.toUpperCase(Locale.ROOT)) = url
+        }
+      }
 
-        env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
-        env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+      YarnContainerInfoHelper.getAttributes(conf, Some(c)).foreach { m =>
+        m.foreach { case (attr, value) =>
+          env("SPARK_EXECUTOR_ATTRIBUTE_" + attr.toUpperCase(Locale.ROOT)) = 
value
+        }
       }
     }
 
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index e6680e1..b5575a1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -17,14 +17,10 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.ApplicationMaster
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
+import org.apache.spark.util.YarnContainerInfoHelper
 
 private[spark] class YarnClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -40,30 +36,10 @@ private[spark] class YarnClusterSchedulerBackend(
   }
 
   override def getDriverLogUrls: Option[Map[String, String]] = {
-    var driverLogs: Option[Map[String, String]] = None
-    try {
-      val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
-      val containerId = YarnSparkHadoopUtil.getContainerId
+    YarnContainerInfoHelper.getLogUrls(sc.hadoopConfiguration, container = 
None)
+  }
 
-      val httpAddress = System.getenv(Environment.NM_HOST.name()) +
-        ":" + System.getenv(Environment.NM_HTTP_PORT.name())
-      // lookup appropriate http scheme for container log urls
-      val yarnHttpPolicy = yarnConf.get(
-        YarnConfiguration.YARN_HTTP_POLICY_KEY,
-        YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
-      )
-      val user = Utils.getCurrentUserName()
-      val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://"; else 
"http://";
-      val baseUrl = 
s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
-      logDebug(s"Base URL for logs: $baseUrl")
-      driverLogs = Some(Map(
-        "stdout" -> s"$baseUrl/stdout?start=-4096",
-        "stderr" -> s"$baseUrl/stderr?start=-4096"))
-    } catch {
-      case e: Exception =>
-        logInfo("Error while building AM log links, so AM" +
-          " logs link will not appear in application UI", e)
-    }
-    driverLogs
+  override def getDriverAttributes: Option[Map[String, String]] = {
+    YarnContainerInfoHelper.getAttributes(sc.hadoopConfiguration, container = 
None)
   }
 }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
new file mode 100644
index 0000000..96350cd
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.hadoop.HadoopIllegalArgumentException
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.internal.Logging
+
+private[spark] object YarnContainerInfoHelper extends Logging {
+  def getLogUrls(
+      conf: Configuration,
+      container: Option[Container]): Option[Map[String, String]] = {
+    try {
+      val yarnConf = new YarnConfiguration(conf)
+
+      val containerId = getContainerId(container)
+      val user = Utils.getCurrentUserName()
+      val httpScheme = getYarnHttpScheme(yarnConf)
+      val httpAddress = getNodeManagerHttpAddress(container)
+
+      val baseUrl = 
s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
+      logDebug(s"Base URL for logs: $baseUrl")
+
+      Some(Map(
+        "stdout" -> s"$baseUrl/stdout?start=-4096",
+        "stderr" -> s"$baseUrl/stderr?start=-4096"))
+    } catch {
+      case e: Exception =>
+        logInfo("Error while building executor logs - executor logs will not 
be available", e)
+        None
+    }
+  }
+
+  def getAttributes(
+      conf: Configuration,
+      container: Option[Container]): Option[Map[String, String]] = {
+    try {
+      val yarnConf = new YarnConfiguration(conf)
+      Some(Map(
+        "HTTP_SCHEME" -> getYarnHttpScheme(yarnConf),
+        "NM_HTTP_ADDRESS" -> getNodeManagerHttpAddress(container),
+        "CLUSTER_ID" -> getClusterId(yarnConf).getOrElse(""),
+        "CONTAINER_ID" -> ConverterUtils.toString(getContainerId(container)),
+        "USER" -> Utils.getCurrentUserName(),
+        "LOG_FILES" -> "stderr,stdout"
+      ))
+    } catch {
+      case e: Exception =>
+        logInfo("Error while retrieving executor attributes - executor logs 
will not be replaced " +
+          "with custom log pattern", e)
+        None
+    }
+  }
+
+  def getContainerId(container: Option[Container]): ContainerId = container 
match {
+    case Some(c) => c.getId
+    case None => YarnSparkHadoopUtil.getContainerId
+  }
+
+  def getClusterId(yarnConf: YarnConfiguration): Option[String] = {
+    try {
+      Some(YarnConfiguration.getClusterId(yarnConf))
+    } catch {
+      case _: HadoopIllegalArgumentException => None
+    }
+  }
+
+  def getYarnHttpScheme(yarnConf: YarnConfiguration): String = {
+    // lookup appropriate http scheme for container log urls
+    val yarnHttpPolicy = yarnConf.get(
+      YarnConfiguration.YARN_HTTP_POLICY_KEY,
+      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
+    )
+    if (yarnHttpPolicy == "HTTPS_ONLY") "https://"; else "http://";
+  }
+
+  def getNodeManagerHttpAddress(container: Option[Container]): String = 
container match {
+    case Some(c) => c.getNodeHttpAddress
+    case None => System.getenv(Environment.NM_HOST.name()) + ":" +
+      System.getenv(Environment.NM_HTTP_PORT.name())
+  }
+}
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 70ad231..b3c5bbd 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -28,7 +28,10 @@ import scala.io.Source
 import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.HadoopIllegalArgumentException
+import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
 
@@ -39,11 +42,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.launcher._
-import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationStart,
-  SparkListenerExecutorAdded}
+import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationStart, SparkListenerExecutorAdded}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.tags.ExtendedYarnTest
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Utils, YarnContainerInfoHelper}
 
 /**
  * Integration tests for YARN; these tests use a mini Yarn cluster to run 
Spark-on-YARN
@@ -332,6 +334,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
 private[spark] class SaveExecutorInfo extends SparkListener {
   val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
   var driverLogs: Option[collection.Map[String, String]] = None
+  var driverAttributes: Option[collection.Map[String, String]] = None
 
   override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
     addedExecutorInfos(executor.executorId) = executor.executorInfo
@@ -339,6 +342,7 @@ private[spark] class SaveExecutorInfo extends SparkListener 
{
 
   override def onApplicationStart(appStart: SparkListenerApplicationStart): 
Unit = {
     driverLogs = appStart.driverLogs
+    driverAttributes = appStart.driverAttributes
   }
 }
 
@@ -443,6 +447,7 @@ private object YarnClusterDriver extends Logging with 
Matchers {
             s"Executor logs contain sensitive info (${SECRET_PASSWORD}): 
\n${log} "
           )
         }
+        assert(info.attributes.nonEmpty)
       }
 
       // If we are running in yarn-cluster mode, verify that driver logs links 
and present and are
@@ -461,9 +466,24 @@ private object YarnClusterDriver extends Logging with 
Matchers {
             s"Driver logs contain sensitive info (${SECRET_PASSWORD}): 
\n${log} "
           )
         }
-        val containerId = YarnSparkHadoopUtil.getContainerId
+
+        val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
+        val containerId = YarnContainerInfoHelper.getContainerId(container = 
None)
         val user = Utils.getCurrentUserName()
+
         
assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
+
+        assert(listener.driverAttributes.nonEmpty)
+        val driverAttributes = listener.driverAttributes.get
+        val expectationAttributes = Map(
+          "HTTP_SCHEME" -> YarnContainerInfoHelper.getYarnHttpScheme(yarnConf),
+          "NM_HTTP_ADDRESS" -> 
YarnContainerInfoHelper.getNodeManagerHttpAddress(container = None),
+          "CLUSTER_ID" -> 
YarnContainerInfoHelper.getClusterId(yarnConf).getOrElse(""),
+          "CONTAINER_ID" -> ConverterUtils.toString(containerId),
+          "USER" -> user,
+          "LOG_FILES" -> "stderr,stdout")
+
+        assert(driverAttributes === expectationAttributes)
       }
     } finally {
       Files.write(result, status, StandardCharsets.UTF_8)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to