This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ae5b2a6 [SPARK-26311][CORE] New feature: apply custom log URL pattern for executor log URLs in SHS ae5b2a6 is described below commit ae5b2a6a92be4986ef5b8062d7fb59318cff6430 Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Wed Jan 30 11:52:30 2019 -0800 [SPARK-26311][CORE] New feature: apply custom log URL pattern for executor log URLs in SHS ## What changes were proposed in this pull request? This patch proposes adding a new configuration on SHS: custom executor log URL pattern. This will enable end users to replace executor logs to other than RM provide, like external log service, which enables to serve executor logs when NodeManager becomes unavailable in case of YARN. End users can build their own of custom executor log URLs with pre-defined patterns which would be vary on each resource manager. This patch adds some patterns to YARN resource manager. (For others, there's even no executor log url available so cannot define patterns as well.) Please refer the doc change as well as added UTs in this patch to see how to set up the feature. ## How was this patch tested? Added UT, as well as manual test with YARN cluster Closes #23260 from HeartSaVioR/SPARK-26311. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../main/scala/org/apache/spark/SparkContext.scala | 3 +- .../spark/deploy/history/FsHistoryProvider.scala | 7 +- .../deploy/history/HistoryAppStatusStore.scala | 133 +++++++++++++ .../executor/CoarseGrainedExecutorBackend.scala | 9 +- .../org/apache/spark/internal/config/History.scala | 18 ++ .../apache/spark/scheduler/SchedulerBackend.scala | 7 + .../org/apache/spark/scheduler/SparkListener.scala | 3 +- .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 +- .../spark/scheduler/cluster/ExecutorData.scala | 5 +- .../spark/scheduler/cluster/ExecutorInfo.scala | 12 +- .../scheduler/local/LocalSchedulerBackend.scala | 3 +- .../apache/spark/status/AppStatusListener.scala | 2 + .../scala/org/apache/spark/status/LiveEntity.scala | 4 +- .../scala/org/apache/spark/status/api/v1/api.scala | 3 +- .../scala/org/apache/spark/util/JsonProtocol.scala | 16 +- .../executor_list_json_expectation.json | 3 +- ...ist_with_executor_metrics_json_expectation.json | 24 ++- ...utor_process_tree_metrics_json_expectation.json | 6 +- .../executor_memory_usage_expectation.json | 15 +- .../executor_node_blacklisting_expectation.json | 15 +- ...de_blacklisting_unblacklisting_expectation.json | 15 +- .../spark/ExecutorAllocationManagerSuite.scala | 16 +- .../org/apache/spark/HeartbeatReceiverSuite.scala | 4 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 4 +- .../deploy/history/FsHistoryProviderSuite.scala | 209 ++++++++++++++++++++- .../scheduler/EventLoggingListenerSuite.scala | 3 +- .../spark/status/AppStatusListenerSuite.scala | 7 +- .../org/apache/spark/util/JsonProtocolSuite.scala | 10 +- docs/monitoring.md | 24 +++ docs/running-on-yarn.md | 32 +++- project/MimaExcludes.scala | 6 + .../mesos/MesosFineGrainedSchedulerBackend.scala | 2 +- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 3 +- .../MesosFineGrainedSchedulerBackendSuite.scala | 15 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 31 ++- .../cluster/YarnClusterSchedulerBackend.scala | 36 +--- .../spark/util/YarnContainerInfoHelper.scala | 103 ++++++++++ .../spark/deploy/yarn/YarnClusterSuite.scala | 28 ++- 39 files changed, 717 insertions(+), 126 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f2b79b8..6273601 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2355,7 +2355,8 @@ class SparkContext(config: SparkConf) extends Logging { // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) + startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls, + schedulerBackend.getDriverAttributes)) _driverLogger.foreach(_.startSync(_hadoopConfiguration)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 33e89c39..56aea0c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -359,10 +359,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) return None } - val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name, - HistoryServer.getAttemptURI(appId, attempt.info.attemptId), - attempt.info.startTime.getTime(), - attempt.info.appSparkVersion) + val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager, + app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime(), attempt.info.appSparkVersion) loadPlugins().foreach(_.setupUI(ui)) val loadedUI = LoadedAppUI(ui) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala new file mode 100644 index 0000000..751382c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.util.matching.Regex + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.History._ +import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1 +import org.apache.spark.util.kvstore.KVStore + +private[spark] class HistoryAppStatusStore( + conf: SparkConf, + store: KVStore) + extends AppStatusStore(store, None) with Logging { + + import HistoryAppStatusStore._ + + private val logUrlPattern: Option[String] = { + val appInfo = super.applicationInfo() + val applicationCompleted = appInfo.attempts.nonEmpty && appInfo.attempts.head.completed + if (applicationCompleted || conf.get(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP)) { + conf.get(CUSTOM_EXECUTOR_LOG_URL) + } else { + None + } + } + + private val informedForMissingAttributes = new AtomicBoolean(false) + + override def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { + val execList = super.executorList(activeOnly) + logUrlPattern match { + case Some(pattern) => execList.map(replaceLogUrls(_, pattern)) + case None => execList + } + } + + override def executorSummary(executorId: String): v1.ExecutorSummary = { + val execSummary = super.executorSummary(executorId) + logUrlPattern match { + case Some(pattern) => replaceLogUrls(execSummary, pattern) + case None => execSummary + } + } + + private def replaceLogUrls(exec: v1.ExecutorSummary, urlPattern: String): v1.ExecutorSummary = { + val attributes = exec.attributes + + // Relation between pattern {{FILE_NAME}} and attribute {{LOG_FILES}} + // Given that HistoryAppStatusStore don't know which types of log files can be provided + // from resource manager, we require resource manager to provide available types of log + // files, which are encouraged to be same as types of log files provided in original log URLs. + // Once we get the list of log files, we need to expose them to end users as a pattern + // so that end users can compose custom log URL(s) including log file name(s). + val allPatterns = CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet + val allPatternsExceptFileName = allPatterns.filter(_ != "FILE_NAME") + val allAttributeKeys = attributes.keySet + val allAttributeKeysExceptLogFiles = allAttributeKeys.filter(_ != "LOG_FILES") + + if (allPatternsExceptFileName.diff(allAttributeKeysExceptLogFiles).nonEmpty) { + logFailToRenewLogUrls("some of required attributes are missing in app's event log.", + allPatternsExceptFileName, allAttributeKeys) + return exec + } else if (allPatterns.contains("FILE_NAME") && !allAttributeKeys.contains("LOG_FILES")) { + logFailToRenewLogUrls("'FILE_NAME' parameter is provided, but file information is " + + "missing in app's event log.", allPatternsExceptFileName, allAttributeKeys) + return exec + } + + val updatedUrl = allPatternsExceptFileName.foldLeft(urlPattern) { case (orig, patt) => + // we already checked the existence of attribute when comparing keys + orig.replace(s"{{$patt}}", attributes(patt)) + } + + val newLogUrlMap = if (allPatterns.contains("FILE_NAME")) { + // allAttributeKeys should contain "LOG_FILES" + attributes("LOG_FILES").split(",").map { file => + file -> updatedUrl.replace("{{FILE_NAME}}", file) + }.toMap + } else { + Map("log" -> updatedUrl) + } + + replaceExecutorLogs(exec, newLogUrlMap) + } + + private def logFailToRenewLogUrls( + reason: String, + allPatterns: Set[String], + allAttributes: Set[String]): Unit = { + if (informedForMissingAttributes.compareAndSet(false, true)) { + logInfo(s"Fail to renew executor log urls: $reason. Required: $allPatterns / " + + s"available: $allAttributes. Falling back to show app's original log urls.") + } + } + + private def replaceExecutorLogs( + source: v1.ExecutorSummary, + newExecutorLogs: Map[String, String]): v1.ExecutorSummary = { + new v1.ExecutorSummary(source.id, source.hostPort, source.isActive, source.rddBlocks, + source.memoryUsed, source.diskUsed, source.totalCores, source.maxTasks, source.activeTasks, + source.failedTasks, source.completedTasks, source.totalTasks, source.totalDuration, + source.totalGCTime, source.totalInputBytes, source.totalShuffleRead, + source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime, + source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics, + source.blacklistedInStages, source.peakMemoryMetrics, source.attributes) + } + +} + +private[spark] object HistoryAppStatusStore { + val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r +} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 48d3630..2865c3b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -60,7 +60,8 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) + ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, + extractAttributes)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => @@ -76,6 +77,12 @@ private[spark] class CoarseGrainedExecutorBackend( .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) } + def extractAttributes: Map[String, String] = { + val prefix = "SPARK_EXECUTOR_ATTRIBUTE_" + sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)) + } + override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index e7d25bf..1d73f01 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -125,4 +125,22 @@ private[spark] object History { val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab") .stringConf .createOptional + + val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.history.custom.executor.log.url") + .doc("Specifies custom spark executor log url for supporting external log service instead of " + + "using cluster managers' application log urls in the history server. Spark will support " + + "some path variables via patterns which can vary on cluster manager. Please check the " + + "documentation for your cluster manager to see which patterns are supported, if any. " + + "This configuration has no effect on a live application, it only affects the history server.") + .stringConf + .createOptional + + val APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP = + ConfigBuilder("spark.history.custom.executor.log.url.applyIncompleteApplication") + .doc("Whether to apply custom executor log url, as specified by " + + "`spark.history.custom.executor.log.url`, to incomplete application as well. " + + "Even if this is true, this still only affects the behavior of the history server, " + + "not running spark applications.") + .booleanConf + .createWithDefault(true) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index c187ee1..9159d2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -70,6 +70,13 @@ private[spark] trait SchedulerBackend { def getDriverLogUrls: Option[Map[String, String]] = None /** + * Get the attributes on driver. These attributes are used to replace log URLs when + * custom log url pattern is specified. + * @return Map containing attributes on driver. + */ + def getDriverAttributes: Option[Map[String, String]] = None + + /** * Get the max number of tasks that can be concurrent launched currently. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e92b8a2..b332c0f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -192,7 +192,8 @@ case class SparkListenerApplicationStart( time: Long, sparkUser: String, appAttemptId: Option[String], - driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent + driverLogs: Option[Map[String, String]] = None, + driverAttributes: Option[Map[String, String]] = None) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 9e768c2..afb48a31 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -63,7 +63,8 @@ private[spark] object CoarseGrainedClusterMessages { executorRef: RpcEndpointRef, hostname: String, cores: Int, - logUrls: Map[String, String]) + logUrls: Map[String, String], + attributes: Map[String, String]) extends CoarseGrainedClusterMessage case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0111b83..fb28e90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -183,7 +183,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => + case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) @@ -207,7 +207,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorAddress, hostname, - cores, cores, logUrls) + cores, cores, logUrls, attributes) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b25a4bf..ebe1c1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -34,5 +34,6 @@ private[cluster] class ExecutorData( override val executorHost: String, var freeCores: Int, override val totalCores: Int, - override val logUrlMap: Map[String, String] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap) + override val logUrlMap: Map[String, String], + override val attributes: Map[String, String] +) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 7f21856..3197e06 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -26,7 +26,12 @@ import org.apache.spark.annotation.DeveloperApi class ExecutorInfo( val executorHost: String, val totalCores: Int, - val logUrlMap: Map[String, String]) { + val logUrlMap: Map[String, String], + val attributes: Map[String, String]) { + + def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = { + this(executorHost, totalCores, logUrlMap, Map.empty) + } def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -35,12 +40,13 @@ class ExecutorInfo( (that canEqual this) && executorHost == that.executorHost && totalCores == that.totalCores && - logUrlMap == that.logUrlMap + logUrlMap == that.logUrlMap && + attributes == that.attributes case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores, logUrlMap) + val state = Seq(executorHost, totalCores, logUrlMap, attributes) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 6ff8bf2..fde2a32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -129,7 +129,8 @@ private[spark] class LocalSchedulerBackend( listenerBus.post(SparkListenerExecutorAdded( System.currentTimeMillis, executorEndpoint.localExecutorId, - new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) + new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty, + Map.empty))) launcherBackend.setAppId(appId) launcherBackend.setState(SparkAppHandle.State.RUNNING) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 1067cdc..f69c7dd 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -131,6 +131,7 @@ private[spark] class AppStatusListener( .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER)) driver.foreach { d => d.executorLogs = logs.toMap + d.attributes = event.driverAttributes.getOrElse(Map.empty).toMap update(d, System.nanoTime()) } } @@ -190,6 +191,7 @@ private[spark] class AppStatusListener( exec.totalCores = event.executorInfo.totalCores exec.maxTasks = event.executorInfo.totalCores / coresPerTask exec.executorLogs = event.executorInfo.logUrlMap + exec.attributes = event.executorInfo.attributes liveUpdate(exec, System.nanoTime()) } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 7f7b83a..6d7b34a 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -258,6 +258,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE var blacklistedInStages: Set[Int] = TreeSet() var executorLogs = Map[String, String]() + var attributes = Map[String, String]() // Memory metrics. They may not be recorded (e.g. old event logs) so if totalOnHeap is not // initialized, the store will not contain this information. @@ -306,7 +307,8 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE executorLogs, memoryMetrics, blacklistedInStages, - Some(peakExecutorMetrics).filter(_.isSet)) + Some(peakExecutorMetrics).filter(_.isSet), + attributes) new ExecutorSummaryWrapper(info) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 825fc54..60b8bf8 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -106,7 +106,8 @@ class ExecutorSummary private[spark]( val blacklistedInStages: Set[Int], @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) - val peakMemoryMetrics: Option[ExecutorMetrics]) + val peakMemoryMetrics: Option[ExecutorMetrics], + val attributes: Map[String, String]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 3370152..d84dd58 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -211,7 +211,8 @@ private[spark] object JsonProtocol { ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) ~ ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~ - ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) + ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) ~ + ("Driver Attributes" -> applicationStart.driverAttributes.map(mapToJson).getOrElse(JNothing)) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { @@ -486,7 +487,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ ("Total Cores" -> executorInfo.totalCores) ~ - ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~ + ("Attributes" -> mapToJson(executorInfo.attributes)) } def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = { @@ -693,7 +695,9 @@ private[spark] object JsonProtocol { val sparkUser = (json \ "User").extract[String] val appAttemptId = jsonOption(json \ "App Attempt ID").map(_.extract[String]) val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson) - SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs) + val driverAttributes = jsonOption(json \ "Driver Attributes").map(mapFromJson) + SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs, + driverAttributes) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { @@ -1061,7 +1065,11 @@ private[spark] object JsonProtocol { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] val logUrls = mapFromJson(json \ "Log Urls").toMap - new ExecutorInfo(executorHost, totalCores, logUrls) + val attributes = jsonOption(json \ "Attributes") match { + case Some(attr) => mapFromJson(attr).toMap + case None => Map.empty[String, String] + } + new ExecutorInfo(executorHost, totalCores, logUrls, attributes) } def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = { diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index 7bb8fe8..209164e 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -20,5 +20,6 @@ "maxMemory" : 278302556, "addTime" : "2015-02-03T16:43:00.906GMT", "executorLogs" : { }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json index 7567477..f282d483 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json @@ -44,7 +44,8 @@ "ProcessTreePythonRSSMemory": 0, "ProcessTreeOtherVMemory": 0, "ProcessTreeOtherRSSMemory": 0 - } + }, + "attributes" : { } }, { "id" : "7", "hostPort" : "node6340.grid.company.com:5933", @@ -76,7 +77,8 @@ "totalOnHeapStorageMemory" : 956615884, "totalOffHeapStorageMemory" : 0 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "6", "hostPort" : "node6644.grid.company.com:8445", @@ -108,7 +110,8 @@ "totalOnHeapStorageMemory" : 956615884, "totalOffHeapStorageMemory" : 0 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "5", "hostPort" : "node2477.grid.company.com:20123", @@ -140,7 +143,8 @@ "totalOnHeapStorageMemory" : 956615884, "totalOffHeapStorageMemory" : 0 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "4", "hostPort" : "node4243.grid.company.com:16084", @@ -190,7 +194,8 @@ "ProcessTreePythonRSSMemory": 0, "ProcessTreeOtherVMemory": 0, "ProcessTreeOtherRSSMemory": 0 - } + }, + "attributes" : { } }, { "id" : "3", "hostPort" : "node0998.grid.company.com:45265", @@ -240,7 +245,8 @@ "ProcessTreePythonRSSMemory": 0, "ProcessTreeOtherVMemory": 0, "ProcessTreeOtherRSSMemory": 0 - } + }, + "attributes" : { } }, { "id" : "2", "hostPort" : "node4045.grid.company.com:29262", @@ -290,7 +296,8 @@ "ProcessTreePythonRSSMemory": 0, "ProcessTreeOtherVMemory": 0, "ProcessTreeOtherRSSMemory": 0 - } + }, + "attributes" : { } }, { "id" : "1", "hostPort" : "node1404.grid.company.com:34043", @@ -340,5 +347,6 @@ "ProcessTreePythonRSSMemory": 0, "ProcessTreeOtherVMemory": 0, "ProcessTreeOtherRSSMemory": 0 - } + }, + "attributes" : { } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json index 69efefe..980ec85 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_process_tree_metrics_json_expectation.json @@ -44,7 +44,8 @@ "ProcessTreePythonRSSMemory" : 40284160, "ProcessTreeOtherVMemory" : 0, "ProcessTreeOtherRSSMemory" : 0 - } + }, + "attributes" : { } }, { "id" : "9", "hostPort" : "rezamemory-2.gce.something.com:40797", @@ -94,5 +95,6 @@ "ProcessTreePythonRSSMemory" : 69013504, "ProcessTreeOtherVMemory" : 0, "ProcessTreeOtherRSSMemory" : 0 - } + }, + "attributes" : { } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json index dd5b1dc..c7f0a86 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json @@ -26,7 +26,8 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "3", "hostPort" : "172.22.0.167:51485", @@ -58,7 +59,8 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } } ,{ "id" : "2", "hostPort" : "172.22.0.167:51487", @@ -90,7 +92,8 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "1", "hostPort" : "172.22.0.167:51490", @@ -122,7 +125,8 @@ "totalOnHeapStorageMemory": 384093388, "totalOffHeapStorageMemory": 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "0", "hostPort" : "172.22.0.167:51491", @@ -154,5 +158,6 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json index 3e55d3d..02a0ecd 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json @@ -26,7 +26,8 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "3", "hostPort" : "172.22.0.167:51485", @@ -58,7 +59,8 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "2", "hostPort" : "172.22.0.167:51487", @@ -90,7 +92,8 @@ "totalOnHeapStorageMemory" : 384093388, "totalOffHeapStorageMemory" : 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "1", "hostPort" : "172.22.0.167:51490", @@ -122,7 +125,8 @@ "totalOnHeapStorageMemory": 384093388, "totalOffHeapStorageMemory": 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "0", "hostPort" : "172.22.0.167:51491", @@ -154,5 +158,6 @@ "totalOnHeapStorageMemory": 384093388, "totalOffHeapStorageMemory": 524288000 }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } } ] diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json index e87f3e7..4d31a38 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json @@ -20,7 +20,8 @@ "maxMemory" : 384093388, "addTime" : "2016-11-15T23:20:38.836GMT", "executorLogs" : { }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "3", "hostPort" : "172.22.0.111:64543", @@ -46,7 +47,8 @@ "stdout" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout", "stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr" }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "2", "hostPort" : "172.22.0.111:64539", @@ -72,7 +74,8 @@ "stdout" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout", "stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr" }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "1", "hostPort" : "172.22.0.111:64541", @@ -98,7 +101,8 @@ "stdout" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stdout", "stderr" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stderr" }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } }, { "id" : "0", "hostPort" : "172.22.0.111:64540", @@ -124,5 +128,6 @@ "stdout" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout", "stderr" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr" }, - "blacklistedInStages" : [ ] + "blacklistedInStages" : [ ], + "attributes" : { } } ] diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index fdaea28..ce0ef2e 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -201,7 +201,7 @@ class ExecutorAllocationManagerSuite // Verify that running a task doesn't affect the target post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3))) post(sc.listenerBus, SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) @@ -809,13 +809,13 @@ class ExecutorAllocationManagerSuite // New executors have registered post(sc.listenerBus, SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) post(sc.listenerBus, SparkListenerExecutorAdded( - 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty))) + 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 2) @@ -842,7 +842,7 @@ class ExecutorAllocationManagerSuite post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) post(sc.listenerBus, SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) @@ -854,7 +854,7 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) post(sc.listenerBus, SparkListenerExecutorAdded( - 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) @@ -862,7 +862,7 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 0) post(sc.listenerBus, SparkListenerExecutorAdded( - 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty))) + 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 1) @@ -1112,7 +1112,7 @@ class ExecutorAllocationManagerSuite // test setup -- job with 2 tasks, scale up to two executors assert(numExecutorsTarget(manager) === 1) manager.listener.onExecutorAdded(SparkListenerExecutorAdded( - clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) manager.listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 2))) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis()) @@ -1120,7 +1120,7 @@ class ExecutorAllocationManagerSuite val taskInfo0 = createTaskInfo(0, 0, "executor-1") manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo0)) manager.listener.onExecutorAdded(SparkListenerExecutorAdded( - clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, Map.empty))) + clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) val taskInfo1 = createTaskInfo(1, 1, "executor-2") manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo1)) assert(numExecutorsTarget(manager) === 2) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index dbe187d..061aeb3 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -174,9 +174,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( - RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty)) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( - RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty)) + RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty)) heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index a6d5066..573a496 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -497,7 +497,7 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty) + val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty) // Get "localhost" on a blacklist. val taskScheduler = mock(classOf[TaskSchedulerImpl]) @@ -621,7 +621,7 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty) + val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askSync[Boolean](message) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 74574e2..bce1733 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -47,8 +47,9 @@ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.UI.{ADMIN_ACLS, ADMIN_ACLS_GROUPS, USER_GROUPS_MAPPING} import org.apache.spark.io._ import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.security.GroupMappingServiceProvider -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusStore, ExecutorSummaryWrapper} import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} import org.apache.spark.util.logging.DriverLogger @@ -291,6 +292,192 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("log urls without customization") { + val conf = createTestConf() + val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", _)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + execInfo -> execInfo.logUrlMap + }.toMap + + testHandlingExecutorLogUrl(conf, expected) + } + + test("custom log urls, including FILE_NAME") { + val conf = createTestConf() + .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = true)) + + // some of available attributes are not used in pattern which should be OK + + val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", _)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + val attr = execInfo.attributes + val newLogUrlMap = attr("LOG_FILES").split(",").map { file => + val newLogUrl = getExpectedExecutorLogUrl(attr, Some(file)) + file -> newLogUrl + }.toMap + + execInfo -> newLogUrlMap + }.toMap + + testHandlingExecutorLogUrl(conf, expected) + } + + test("custom log urls, excluding FILE_NAME") { + val conf = createTestConf() + .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = false)) + + // some of available attributes are not used in pattern which should be OK + + val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", _)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + val attr = execInfo.attributes + val newLogUrl = getExpectedExecutorLogUrl(attr, None) + + execInfo -> Map("log" -> newLogUrl) + }.toMap + + testHandlingExecutorLogUrl(conf, expected) + } + + test("custom log urls with invalid attribute") { + // Here we are referring {{NON_EXISTING}} which is not available in attributes, + // which Spark will fail back to provide origin log url with warning log. + + val conf = createTestConf() + .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = true) + + "/{{NON_EXISTING}}") + + val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", _)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + execInfo -> execInfo.logUrlMap + }.toMap + + testHandlingExecutorLogUrl(conf, expected) + } + + test("custom log urls, LOG_FILES not available while FILE_NAME is specified") { + // For this case Spark will fail back to provide origin log url with warning log. + + val conf = createTestConf() + .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = true)) + + val executorInfos = (1 to 5).map( + createTestExecutorInfo("app1", "user1", _, includingLogFiles = false)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + execInfo -> execInfo.logUrlMap + }.toMap + + testHandlingExecutorLogUrl(conf, expected) + } + + test("custom log urls, app not finished, applyIncompleteApplication: true") { + val conf = createTestConf() + .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = true)) + .set(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP, true) + + // ensure custom log urls are applied to incomplete application + + val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", _)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + val attr = execInfo.attributes + val newLogUrlMap = attr("LOG_FILES").split(",").map { file => + val newLogUrl = getExpectedExecutorLogUrl(attr, Some(file)) + file -> newLogUrl + }.toMap + + execInfo -> newLogUrlMap + }.toMap + + testHandlingExecutorLogUrl(conf, expected, isCompletedApp = false) + } + + test("custom log urls, app not finished, applyIncompleteApplication: false") { + val conf = createTestConf() + .set(CUSTOM_EXECUTOR_LOG_URL, getCustomExecutorLogUrl(includeFileName = true)) + .set(APPLY_CUSTOM_EXECUTOR_LOG_URL_TO_INCOMPLETE_APP, false) + + // ensure custom log urls are NOT applied to incomplete application + + val executorInfos = (1 to 5).map(createTestExecutorInfo("app1", "user1", _)) + + val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map { execInfo => + execInfo -> execInfo.logUrlMap + }.toMap + + testHandlingExecutorLogUrl(conf, expected, isCompletedApp = false) + } + + private def getCustomExecutorLogUrl(includeFileName: Boolean): String = { + val baseUrl = "http://newhost:9999/logs/clusters/{{CLUSTER_ID}}/users/{{USER}}/containers/" + + "{{CONTAINER_ID}}" + if (includeFileName) baseUrl + "/{{FILE_NAME}}" else baseUrl + } + + private def getExpectedExecutorLogUrl( + attributes: Map[String, String], + fileName: Option[String]): String = { + val baseUrl = s"http://newhost:9999/logs/clusters/${attributes("CLUSTER_ID")}" + + s"/users/${attributes("USER")}/containers/${attributes("CONTAINER_ID")}" + + fileName match { + case Some(file) => baseUrl + s"/$file" + case None => baseUrl + } + } + + private def testHandlingExecutorLogUrl( + conf: SparkConf, + expectedLogUrlMap: Map[ExecutorInfo, Map[String, String]], + isCompletedApp: Boolean = true): Unit = { + val provider = new FsHistoryProvider(conf) + + val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) + + val executorAddedEvents = expectedLogUrlMap.keys.zipWithIndex.map { case (execInfo, idx) => + SparkListenerExecutorAdded(1 + idx, s"exec$idx", execInfo) + }.toList.sortBy(_.time) + val allEvents = List(SparkListenerApplicationStart("app1", Some("app1"), 1L, + "test", Some("attempt1"))) ++ executorAddedEvents ++ + (if (isCompletedApp) List(SparkListenerApplicationEnd(1000L)) else Seq()) + + writeFile(attempt1, true, None, allEvents: _*) + + updateAndCheck(provider) { list => + list.size should be (1) + list.head.attempts.size should be (1) + + list.foreach { app => + app.attempts.foreach { attempt => + val appUi = provider.getAppUI(app.id, attempt.attemptId) + appUi should not be null + val executors = appUi.get.ui.store.executorList(false).iterator + executors should not be null + + val iterForExpectation = expectedLogUrlMap.iterator + + var executorCount = 0 + while (executors.hasNext) { + val executor = executors.next() + val (expectedExecInfo, expectedLogs) = iterForExpectation.next() + + executor.hostPort should startWith(expectedExecInfo.executorHost) + executor.executorLogs should be(expectedLogs) + + executorCount += 1 + } + + executorCount should be (expectedLogUrlMap.size) + } + } + } + } + test("log cleaner") { val maxAge = TimeUnit.SECONDS.toMillis(10) val clock = new ManualClock(maxAge / 2) @@ -1048,6 +1235,26 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc conf } + private def createTestExecutorInfo( + appId: String, + user: String, + executorSeqNum: Int, + includingLogFiles: Boolean = true): ExecutorInfo = { + val host = s"host$executorSeqNum" + val container = s"container$executorSeqNum" + val cluster = s"cluster$executorSeqNum" + val logUrlPrefix = s"http://$host:8888/$appId/$container/origin" + + val executorLogUrlMap = Map("stdout" -> s"$logUrlPrefix/stdout", + "stderr" -> s"$logUrlPrefix/stderr") + + val extraAttributes = if (includingLogFiles) Map("LOG_FILES" -> "stdout,stderr") else Map.empty + val executorAttributes = Map("CONTAINER_ID" -> container, "CLUSTER_ID" -> cluster, + "USER" -> user) ++ extraAttributes + + new ExecutorInfo(host, 1, executorLogUrlMap, executorAttributes) + } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) extends FsHistoryProvider(conf, clock) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 811b9fe..40521f0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -422,7 +422,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } private def createExecutorAddedEvent(executorId: Int) = { - SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + SparkListenerExecutorAdded(0L, executorId.toString, + new ExecutorInfo("host1", 1, Map.empty, Map.empty)) } private def createExecutorRemovedEvent(executorId: Int) = { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index ede9466..356e6d1 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -135,7 +135,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { execIds.foreach { id => listener.onExecutorAdded(SparkListenerExecutorAdded(time, id, - new ExecutorInfo(s"$id.example.com", 1, Map()))) + new ExecutorInfo(s"$id.example.com", 1, Map.empty, Map.empty))) } execIds.foreach { id => @@ -685,7 +685,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val bm2 = BlockManagerId("2", "2.example.com", 84) Seq(bm1, bm2).foreach { bm => listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, - new ExecutorInfo(bm.host, 1, Map()))) + new ExecutorInfo(bm.host, 1, Map.empty, Map.empty))) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory)) check[ExecutorSummaryWrapper](bm.executorId) { exec => assert(exec.info.maxMemory === maxMemory) @@ -1553,7 +1553,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { /** Create an executor added event for the specified executor Id. */ private def createExecutorAddedEvent(executorId: Int) = { - SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + SparkListenerExecutorAdded(0L, executorId.toString, + new ExecutorInfo("host1", 1, Map.empty, Map.empty)) } /** Create an executor added event for the specified executor Id. */ diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c63f04d..c3ff379 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -76,13 +76,14 @@ class JsonProtocolSuite extends SparkFunSuite { BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt")) val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt"), Some(logUrlMap)) val applicationEnd = SparkListenerApplicationEnd(42L) val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) + new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22) val executorUnblacklisted = @@ -138,13 +139,14 @@ class JsonProtocolSuite extends SparkFunSuite { test("Dependent Classes") { val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) - testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap)) + testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, attributes)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -1858,6 +1860,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Log Urls" : { | "stderr" : "mystderr", | "stdout" : "mystdout" + | }, + | "Attributes" : { + | "ContainerId" : "ct1", + | "User" : "spark" | } | } |} diff --git a/docs/monitoring.md b/docs/monitoring.md index 6bb620a..1e292d2 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -248,6 +248,30 @@ Security options for the Spark History Server are covered more detail in the written to disk will be re-used in the event of a history server restart. </td> </tr> + <tr> + <td>spark.history.custom.executor.log.url</td> + <td>(none)</td> + <td> + Specifies custom spark executor log URL for supporting external log service instead of using cluster + managers' application log URLs in the history server. Spark will support some path variables via patterns + which can vary on cluster manager. Please check the documentation for your cluster manager to + see which patterns are supported, if any. This configuration has no effect on a live application, it only + affects the history server. + <p/> + For now, only YARN mode supports this configuration + </td> + </tr> + <tr> + <td>spark.history.custom.executor.log.url.applyIncompleteApplication</td> + <td>false</td> + <td> + Specifies whether to apply custom spark executor log URL to incomplete applications as well. + If executor logs for running applications should be provided as origin log URLs, set this to `false`. + Please note that incomplete applications may include applications which didn't shutdown gracefully. + Even this is set to `true`, this configuration has no effect on a live application, it only affects the history server. + </td> + </tr> + </table> Note that in all of these UIs, the tables are sortable by clicking their headers, diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index a7a448f..c3ebadb 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -472,6 +472,36 @@ To use a custom metrics.properties for the application master and executors, upd </tr> </table> +#### Available patterns for SHS custom executor log URL + +<table class="table"> + <tr><th>Pattern</th><th>Meaning</th></tr> + <tr> + <td>{{HTTP_SCHEME}}</td> + <td>`http://` or `https://` according to YARN HTTP policy. (Configured via `yarn.http.policy`)</td> + </tr> + <tr> + <td>{{NM_HTTP_ADDRESS}}</td> + <td>Http URI of the node on which the container is allocated.</td> + </tr> + <tr> + <td>{{CLUSTER_ID}}</td> + <td>The cluster ID of Resource Manager. (Configured via `yarn.resourcemanager.cluster-id`)</td> + </tr> + <tr> + <td>{{CONTAINER_ID}}</td> + <td>The ID of container.</td> + </tr> + <tr> + <td>{{USER}}</td> + <td>'SPARK_USER' on system environment.</td> + </tr> + <tr> + <td>{{FILE_NAME}}</td> + <td>`stdout`, `stderr`.</td> + </tr> +</table> + # Important notes - Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured. @@ -659,4 +689,4 @@ do the following: - On the Spark History Server, add <code>org.apache.spark.deploy.yarn.YarnProxyRedirectFilter</code> to the list of filters in the <code>spark.ui.filters</code> configuration. -Be aware that the history server information may not be up-to-date with the application's state. +Be aware that the history server information may not be up-to-date with the application's state. \ No newline at end of file diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a13ee51..5d5d01f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -39,6 +39,12 @@ object MimaExcludes { // [SPARK-26254][CORE] Extract Hive + Kafka dependencies from Core. ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.deploy.security.HiveDelegationTokenProvider"), + // [SPARK-26311][CORE]New feature: apply custom log URL pattern for executor log URLs + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerApplicationStart$"), + // [SPARK-25765][ML] Add training cost to BisectingKMeans summary ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel.this"), diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 192f940..c5c7842 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -328,7 +328,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( slaveIdToWorkerOffer.get(slaveId).foreach(o => listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, // TODO: Add support for log urls for Mesos - new ExecutorInfo(o.host, o.cores, Map.empty))) + new ExecutorInfo(o.host, o.cores, Map.empty, Map.empty))) ) logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f2fd5e6..a4e9cc3 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -687,7 +687,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { val mockEndpointRef = mock[RpcEndpointRef] val mockAddress = mock[RpcAddress] - val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty) + val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty, + Map.empty) backend.driverEndpoint.askSync[Boolean](message) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index c9b7e6c..0b6d93f 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -86,7 +86,8 @@ class MesosFineGrainedSchedulerBackendSuite val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", + new ExecutorInfo("host1", 2, Map.empty, Map.empty))) val sc = mock[SparkContext] when(sc.getSparkHome()).thenReturn(Option("/spark-home")) @@ -117,7 +118,8 @@ class MesosFineGrainedSchedulerBackendSuite val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", + new ExecutorInfo("host1", 2, Map.empty, Map.empty))) val sc = mock[SparkContext] when(sc.getSparkHome()).thenReturn(Option("/spark-home")) @@ -157,7 +159,8 @@ class MesosFineGrainedSchedulerBackendSuite val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", + new ExecutorInfo("host1", 2, Map.empty, Map.empty))) val sc = mock[SparkContext] when(sc.executorMemory).thenReturn(100) @@ -217,7 +220,8 @@ class MesosFineGrainedSchedulerBackendSuite val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", + new ExecutorInfo("host1", 2, Map.empty, Map.empty))) val sc = mock[SparkContext] when(sc.executorMemory).thenReturn(100) @@ -308,7 +312,8 @@ class MesosFineGrainedSchedulerBackendSuite val listenerBus = mock[LiveListenerBus] listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + SparkListenerExecutorAdded(anyLong, "s1", + new ExecutorInfo("host1", 2, Map.empty, Map.empty))) val sc = mock[SparkContext] when(sc.executorMemory).thenReturn(100) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 77ce2f6..0b909d1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -19,11 +19,12 @@ package org.apache.spark.deploy.yarn import java.io.File import java.nio.ByteBuffer -import java.util.Collections +import java.util.{Collections, Locale} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer} +import org.apache.hadoop.HadoopIllegalArgumentException import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation @@ -33,13 +34,13 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.NMClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.JavaUtils -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, YarnContainerInfoHelper} private[yarn] class ExecutorRunnable( container: Option[Container], @@ -220,13 +221,6 @@ private[yarn] class ExecutorRunnable( val env = new HashMap[String, String]() Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) - // lookup appropriate http scheme for container log urls - val yarnHttpPolicy = conf.get( - YarnConfiguration.YARN_HTTP_POLICY_KEY, - YarnConfiguration.YARN_HTTP_POLICY_DEFAULT - ) - val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" - System.getenv().asScala.filterKeys(_.startsWith("SPARK")) .foreach { case (k, v) => env(k) = v } @@ -241,15 +235,18 @@ private[yarn] class ExecutorRunnable( } } - // Add log urls + // Add log urls, as well as executor attributes container.foreach { c => - sys.env.filterKeys(_.endsWith("USER")).foreach { user => - val containerId = ConverterUtils.toString(c.getId) - val address = c.getNodeHttpAddress - val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user" + YarnContainerInfoHelper.getLogUrls(conf, Some(c)).foreach { m => + m.foreach { case (fileName, url) => + env("SPARK_LOG_URL_" + fileName.toUpperCase(Locale.ROOT)) = url + } + } - env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096" - env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096" + YarnContainerInfoHelper.getAttributes(conf, Some(c)).foreach { m => + m.foreach { case (attr, value) => + env("SPARK_EXECUTOR_ATTRIBUTE_" + attr.toUpperCase(Locale.ROOT)) = value + } } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index e6680e1..b5575a1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -17,14 +17,10 @@ package org.apache.spark.scheduler.cluster -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.conf.YarnConfiguration - import org.apache.spark.SparkContext -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} +import org.apache.spark.deploy.yarn.ApplicationMaster import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils +import org.apache.spark.util.YarnContainerInfoHelper private[spark] class YarnClusterSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -40,30 +36,10 @@ private[spark] class YarnClusterSchedulerBackend( } override def getDriverLogUrls: Option[Map[String, String]] = { - var driverLogs: Option[Map[String, String]] = None - try { - val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) - val containerId = YarnSparkHadoopUtil.getContainerId + YarnContainerInfoHelper.getLogUrls(sc.hadoopConfiguration, container = None) + } - val httpAddress = System.getenv(Environment.NM_HOST.name()) + - ":" + System.getenv(Environment.NM_HTTP_PORT.name()) - // lookup appropriate http scheme for container log urls - val yarnHttpPolicy = yarnConf.get( - YarnConfiguration.YARN_HTTP_POLICY_KEY, - YarnConfiguration.YARN_HTTP_POLICY_DEFAULT - ) - val user = Utils.getCurrentUserName() - val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" - val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" - logDebug(s"Base URL for logs: $baseUrl") - driverLogs = Some(Map( - "stdout" -> s"$baseUrl/stdout?start=-4096", - "stderr" -> s"$baseUrl/stderr?start=-4096")) - } catch { - case e: Exception => - logInfo("Error while building AM log links, so AM" + - " logs link will not appear in application UI", e) - } - driverLogs + override def getDriverAttributes: Option[Map[String, String]] = { + YarnContainerInfoHelper.getAttributes(sc.hadoopConfiguration, container = None) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala new file mode 100644 index 0000000..96350cd --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.hadoop.HadoopIllegalArgumentException +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.records.{Container, ContainerId} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils + +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil +import org.apache.spark.internal.Logging + +private[spark] object YarnContainerInfoHelper extends Logging { + def getLogUrls( + conf: Configuration, + container: Option[Container]): Option[Map[String, String]] = { + try { + val yarnConf = new YarnConfiguration(conf) + + val containerId = getContainerId(container) + val user = Utils.getCurrentUserName() + val httpScheme = getYarnHttpScheme(yarnConf) + val httpAddress = getNodeManagerHttpAddress(container) + + val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" + logDebug(s"Base URL for logs: $baseUrl") + + Some(Map( + "stdout" -> s"$baseUrl/stdout?start=-4096", + "stderr" -> s"$baseUrl/stderr?start=-4096")) + } catch { + case e: Exception => + logInfo("Error while building executor logs - executor logs will not be available", e) + None + } + } + + def getAttributes( + conf: Configuration, + container: Option[Container]): Option[Map[String, String]] = { + try { + val yarnConf = new YarnConfiguration(conf) + Some(Map( + "HTTP_SCHEME" -> getYarnHttpScheme(yarnConf), + "NM_HTTP_ADDRESS" -> getNodeManagerHttpAddress(container), + "CLUSTER_ID" -> getClusterId(yarnConf).getOrElse(""), + "CONTAINER_ID" -> ConverterUtils.toString(getContainerId(container)), + "USER" -> Utils.getCurrentUserName(), + "LOG_FILES" -> "stderr,stdout" + )) + } catch { + case e: Exception => + logInfo("Error while retrieving executor attributes - executor logs will not be replaced " + + "with custom log pattern", e) + None + } + } + + def getContainerId(container: Option[Container]): ContainerId = container match { + case Some(c) => c.getId + case None => YarnSparkHadoopUtil.getContainerId + } + + def getClusterId(yarnConf: YarnConfiguration): Option[String] = { + try { + Some(YarnConfiguration.getClusterId(yarnConf)) + } catch { + case _: HadoopIllegalArgumentException => None + } + } + + def getYarnHttpScheme(yarnConf: YarnConfiguration): String = { + // lookup appropriate http scheme for container log urls + val yarnHttpPolicy = yarnConf.get( + YarnConfiguration.YARN_HTTP_POLICY_KEY, + YarnConfiguration.YARN_HTTP_POLICY_DEFAULT + ) + if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" + } + + def getNodeManagerHttpAddress(container: Option[Container]): String = container match { + case Some(c) => c.getNodeHttpAddress + case None => System.getenv(Environment.NM_HOST.name()) + ":" + + System.getenv(Environment.NM_HTTP_PORT.name()) + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 70ad231..b3c5bbd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -28,7 +28,10 @@ import scala.io.Source import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.HadoopIllegalArgumentException +import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -39,11 +42,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.launcher._ -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, - SparkListenerExecutorAdded} +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.tags.ExtendedYarnTest -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, YarnContainerInfoHelper} /** * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN @@ -332,6 +334,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { private[spark] class SaveExecutorInfo extends SparkListener { val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() var driverLogs: Option[collection.Map[String, String]] = None + var driverAttributes: Option[collection.Map[String, String]] = None override def onExecutorAdded(executor: SparkListenerExecutorAdded) { addedExecutorInfos(executor.executorId) = executor.executorInfo @@ -339,6 +342,7 @@ private[spark] class SaveExecutorInfo extends SparkListener { override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = { driverLogs = appStart.driverLogs + driverAttributes = appStart.driverAttributes } } @@ -443,6 +447,7 @@ private object YarnClusterDriver extends Logging with Matchers { s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " ) } + assert(info.attributes.nonEmpty) } // If we are running in yarn-cluster mode, verify that driver logs links and present and are @@ -461,9 +466,24 @@ private object YarnClusterDriver extends Logging with Matchers { s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " ) } - val containerId = YarnSparkHadoopUtil.getContainerId + + val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) + val containerId = YarnContainerInfoHelper.getContainerId(container = None) val user = Utils.getCurrentUserName() + assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096")) + + assert(listener.driverAttributes.nonEmpty) + val driverAttributes = listener.driverAttributes.get + val expectationAttributes = Map( + "HTTP_SCHEME" -> YarnContainerInfoHelper.getYarnHttpScheme(yarnConf), + "NM_HTTP_ADDRESS" -> YarnContainerInfoHelper.getNodeManagerHttpAddress(container = None), + "CLUSTER_ID" -> YarnContainerInfoHelper.getClusterId(yarnConf).getOrElse(""), + "CONTAINER_ID" -> ConverterUtils.toString(containerId), + "USER" -> user, + "LOG_FILES" -> "stderr,stdout") + + assert(driverAttributes === expectationAttributes) } } finally { Files.write(result, status, StandardCharsets.UTF_8) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org