Repository: spark
Updated Branches:
  refs/heads/master 1eb8389cb -> 79e45c932


[SPARK-3377] [SPARK-3610] Metrics can be accidentally aggregated / History 
server log name should not be based on user input

This PR is another solution for #2250

I'm using codahale base MetricsSystem of Spark with JMX or Graphite, and I saw 
following 2 problems.

(1) When applications which have same spark.app.name run on cluster at the same 
time, some metrics names are mixed. For instance, if 2+ application is running 
on the cluster at the same time, each application emits the same named metric 
like "SparkPi.DAGScheduler.stage.failedStages" and Graphite cannot distinguish 
the metrics is for which application.

(2) When 2+ executors run on the same machine, JVM metrics of each executors 
are mixed. For instance, 2+ executors running on the same node can emit the 
same named metric "jvm.memory" and Graphite cannot distinguish the metrics is 
from which application.

And there is an similar issue. The directory for event logs is named using 
application name.
Application name is defined by user and the name can includes illegal character 
for path names.
Further more, the directory name consists of application name and 
System.currentTimeMillis even though each application has unique Application ID 
so if we run jobs which have same name, it's difficult to identify which 
directory is for which application.

Closes #2250
Closes #1067

Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>

Closes #2432 from sarutak/metrics-structure-improvement2 and squashes the 
following commits:

3288b2b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
39169e4 [Kousuke Saruta] Fixed style
6570494 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
817e4f0 [Kousuke Saruta] Simplified MetricsSystem#buildRegistryName
67fa5eb [Kousuke Saruta] Unified MetricsSystem#registerSources and 
registerSinks in start
10be654 [Kousuke Saruta] Fixed style.
990c078 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
f0c7fba [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
59cc2cd [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite
f9b6fb3 [Kousuke Saruta] Modified style.
2cf8a0f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
389090d [Kousuke Saruta] Replaced taskScheduler.applicationId() with 
getApplicationId in SparkContext#postApplicationStart
ff45c89 [Kousuke Saruta] Added some test cases to MetricsSystemSuite
69c46a6 [Kousuke Saruta] Added warning logging logic to 
MetricsSystem#buildRegistryName
5cca0d2 [Kousuke Saruta] Added Javadoc comment to SparkContext#getApplicationId
16a9f01 [Kousuke Saruta] Added data types to be returned to some methods
6434b06 [Kousuke Saruta] Reverted changes related to ApplicationId
0413b90 [Kousuke Saruta] Deleted ApplicationId.java and ApplicationIdSuite.java
a42300c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
0fc1b09 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
42bea55 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
248935d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
f6af132 [Kousuke Saruta] Modified SchedulerBackend and TaskScheduler to return 
System.currentTimeMillis as an unique Application Id
1b8b53e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
97cb85c [Kousuke Saruta] Modified confliction of MimExcludes
2cdd009 [Kousuke Saruta] Modified defailt implementation of applicationId
9aadb0b [Kousuke Saruta] Modified NetworkReceiverSuite to ensure 
"executor.start()" is finished in test "network receiver life cycle"
3011efc [Kousuke Saruta] Added ApplicationIdSuite.scala
d009c55 [Kousuke Saruta] Modified ApplicationId#equals to compare appIds
dfc83fd [Kousuke Saruta] Modified ApplicationId to implement Serializable
9ff4851 [Kousuke Saruta] Modified MimaExcludes.scala to ignore 
createTaskScheduler method in SparkContext
4567ffc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
6a91b14 [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite, 
ExecutorRunnerTest and EventLoggingListenerSuite
0325caf [Kousuke Saruta] Added ApplicationId.scala
0a2fc14 [Kousuke Saruta] Modified style
eabda80 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
0f890e6 [Kousuke Saruta] Modified SparkDeploySchedulerBackend and Master to 
pass baseLogDir instead f eventLogDir
bcf25bf [Kousuke Saruta] Modified directory name for EventLogs
28d4d93 [Kousuke Saruta] Modified SparkContext and EventLoggingListener so that 
the directory for EventLogs is named same for Application ID
203634e [Kousuke Saruta] Modified comment in SchedulerBackend#applicationId and 
TaskScheduler#applicationId
424fea4 [Kousuke Saruta] Modified  the subclasses of TaskScheduler and 
SchedulerBackend so that they can return non-optional Unique Application ID
b311806 [Kousuke Saruta] Swapped last 2 arguments passed to 
CoarseGrainedExecutorBackend
8a2b6ec [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
086ee25 [Kousuke Saruta] Merge branch 'metrics-structure-improvement2' of 
github.com:sarutak/spark into metrics-structure-improvement2
e705386 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
36d2f7a [Kousuke Saruta] Added warning message for the situation we cannot get 
application id for the prefix for the name of metrics
eea6e19 [Kousuke Saruta] Modified CoarseGrainedMesosSchedulerBackend and 
MesosSchedulerBackend so that we can get Application ID
c229fbe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
e719c39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
4a93c7f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement2
4776f9e [Kousuke Saruta] Modified MetricsSystemSuite.scala
efcb6e1 [Kousuke Saruta] Modified to add application id to metrics name
2ec848a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
3ea7896 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
ead8966 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
08e627e [Kousuke Saruta] Revert "tmp"
7b67f5a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
45bd33d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
93e263a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
848819c [Kousuke Saruta] Merge branch 'metrics-structure-improvement' of 
github.com:sarutak/spark into metrics-structure-improvement
912a637 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
e4a4593 [Kousuke Saruta] tmp
3e098d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
4603a39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
fa7175b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
15f88a3 [Kousuke Saruta] Modified MetricsSystem#buildRegistryName because 
conf.get does not return null when correspondin entry is absent
6f7dcd4 [Kousuke Saruta] Modified constructor of DAGSchedulerSource and 
BlockManagerSource because the instance of SparkContext is no longer used
6fc5560 [Kousuke Saruta] Modified sourceName of ExecutorSource, 
DAGSchedulerSource and BlockManagerSource
4e057c9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark 
into metrics-structure-improvement
85ffc02 [Kousuke Saruta] Revert "Modified sourceName of ExecutorSource, 
DAGSchedulerSource and BlockManagerSource"
868e326 [Kousuke Saruta] Modified MetricsSystem to set registry name with 
unique application-id and driver/executor-id
71609f5 [Kousuke Saruta] Modified sourceName of ExecutorSource, 
DAGSchedulerSource and BlockManagerSource
55debab [Kousuke Saruta] Modified SparkContext and Executor to set 
spark.executor.id to identifiers
4180993 [Kousuke Saruta] Modified SparkContext to retain spark.unique.app.name 
property in SparkConf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79e45c93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79e45c93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79e45c93

Branch: refs/heads/master
Commit: 79e45c9323455a51f25ed9acd0edd8682b4bbb88
Parents: 1eb8389
Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>
Authored: Fri Oct 3 13:48:56 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Fri Oct 3 13:48:56 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  52 +++++---
 .../main/scala/org/apache/spark/SparkEnv.scala  |   8 +-
 .../org/apache/spark/deploy/master/Master.scala |  12 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  16 ++-
 .../org/apache/spark/executor/Executor.scala    |   1 +
 .../apache/spark/executor/ExecutorSource.scala  |   3 +-
 .../spark/executor/MesosExecutorBackend.scala   |   3 +-
 .../apache/spark/metrics/MetricsSystem.scala    |  40 +++++-
 .../spark/scheduler/DAGSchedulerSource.scala    |   4 +-
 .../spark/scheduler/EventLoggingListener.scala  |  33 ++---
 .../spark/scheduler/SchedulerBackend.scala      |   8 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   8 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   2 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   9 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  11 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  13 +-
 .../spark/scheduler/local/LocalBackend.scala    |   3 +
 .../spark/storage/BlockManagerSource.scala      |   4 +-
 .../spark/metrics/MetricsSystemSuite.scala      | 128 ++++++++++++++++++-
 .../scheduler/EventLoggingListenerSuite.scala   |  14 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |   3 +-
 .../spark/streaming/NetworkReceiverSuite.scala  |  14 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |   3 +-
 .../deploy/yarn/ExecutorRunnableUtil.scala      |   2 +
 .../spark/deploy/yarn/YarnAllocator.scala       |   2 +
 .../cluster/YarnClientSchedulerBackend.scala    |   6 +-
 .../cluster/YarnClusterSchedulerBackend.scala   |   9 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |   3 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |   2 +-
 29 files changed, 331 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 979d178..97109b9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
   val master = conf.get("spark.master")
   val appName = conf.get("spark.app.name")
 
+  private[spark] val isEventLogEnabled = 
conf.getBoolean("spark.eventLog.enabled", false)
+  private[spark] val eventLogDir: Option[String] = {
+    if (isEventLogEnabled) {
+      Some(conf.get("spark.eventLog.dir", 
EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
+    } else {
+      None
+    }
+  }
+
   // Generate the random name for a temp folder in Tachyon
   // Add a timestamp as the suffix here to make it more safe
   val tachyonFolderName = "spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] val listenerBus = new LiveListenerBus
 
   // Create the Spark execution environment (cache, map output tracker, etc)
+  conf.set("spark.executor.id", "driver")
   private[spark] val env = SparkEnv.create(
     conf,
     "<driver>",
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) 
that we reuse. */
   val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
 
-  // Optionally log Spark events
-  private[spark] val eventLogger: Option[EventLoggingListener] = {
-    if (conf.getBoolean("spark.eventLog.enabled", false)) {
-      val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
-      logger.start()
-      listenerBus.addListener(logger)
-      Some(logger)
-    } else None
-  }
-
-  // At this point, all relevant SparkListeners have been registered, so begin 
releasing events
-  listenerBus.start()
-
   val startTime = System.currentTimeMillis()
 
   // Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
   // constructor
   taskScheduler.start()
 
+  val applicationId: String = taskScheduler.applicationId()
+  conf.set("spark.app.id", applicationId)
+
+  val metricsSystem = env.metricsSystem
+
+  // The metrics system for Driver need to be set spark.app.id to app ID.
+  // So it should start after we get app ID from the task scheduler and set 
spark.app.id.
+  metricsSystem.start()
+
+  // Optionally log Spark events
+  private[spark] val eventLogger: Option[EventLoggingListener] = {
+    if (isEventLogEnabled) {
+      val logger =
+        new EventLoggingListener(applicationId, eventLogDir.get, conf, 
hadoopConfiguration)
+      logger.start()
+      listenerBus.addListener(logger)
+      Some(logger)
+    } else None
+  }
+
+  // At this point, all relevant SparkListeners have been registered, so begin 
releasing events
+  listenerBus.start()
+
   private[spark] val cleaner: Option[ContextCleaner] = {
     if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
       Some(new ContextCleaner(this))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
   // Post init
   taskScheduler.postStartHook()
 
-  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, 
this)
-  private val blockManagerSource = new 
BlockManagerSource(SparkEnv.get.blockManager, this)
+  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+  private val blockManagerSource = new 
BlockManagerSource(SparkEnv.get.blockManager)
 
   private def initDriverMetrics() {
     SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private def postApplicationStart() {
     // Note: this code assumes that the task scheduler has been initialized 
and has contacted
     // the cluster manager to get an application ID (in case the cluster 
manager provides one).
-    listenerBus.post(SparkListenerApplicationStart(appName, 
taskScheduler.applicationId(),
+    listenerBus.post(SparkListenerApplicationStart(appName, 
Some(applicationId),
       startTime, sparkUser))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 009ed64..72cac42 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -259,11 +259,15 @@ object SparkEnv extends Logging {
       }
 
     val metricsSystem = if (isDriver) {
+      // Don't start metrics system right now for Driver.
+      // We need to wait for the task scheduler to give us an app ID.
+      // Then we can start the metrics system.
       MetricsSystem.createMetricsSystem("driver", conf, securityManager)
     } else {
-      MetricsSystem.createMetricsSystem("executor", conf, securityManager)
+      val ms = MetricsSystem.createMetricsSystem("executor", conf, 
securityManager)
+      ms.start()
+      ms
     }
-    metricsSystem.start()
 
     // Set the sparkFiles directory, used when downloading dependencies.  In 
local mode,
     // this is a temporary directory; in distributed mode, this is the 
executor's current working

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 432b552..f98b531 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, 
RemotingLifecycleEvent}
 import akka.serialization.SerializationExtension
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState,
-  SparkHadoopUtil}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
+  ExecutorState, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.master.DriverState.DriverState
@@ -693,16 +693,18 @@ private[spark] class Master(
       app.desc.appUiUrl = notFoundBasePath
       return false
     }
-    val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
+
+    val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, 
app.id)
+    val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
       SparkHadoopUtil.get.newConfiguration(conf))
-    val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, 
fileSystem)
+    val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, 
fileSystem)
     val eventLogPaths = eventLogInfo.logPaths
     val compressionCodec = eventLogInfo.compressionCodec
 
     if (eventLogPaths.isEmpty) {
       // Event logging is enabled for this application, but no event logs are 
found
       val title = s"Application history not found (${app.id})"
-      var msg = s"No event logs found for application $appName in 
$eventLogDir."
+      var msg = s"No event logs found for application $appName in 
$appEventLogDir."
       logWarning(msg)
       msg += " Did you specify the correct logging directory?"
       msg = URLEncoder.encode(msg, "UTF-8")

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 13af5b6..06061ed 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       executorId: String,
       hostname: String,
       cores: Int,
+      appId: String,
       workerUrl: Option[String]) {
 
     SignalLogger.register(log)
@@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       val driver = fetcher.actorSelection(driverUrl)
       val timeout = AkkaUtils.askTimeout(executorConf)
       val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
-      val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, 
String)]]
+      val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, 
String)]] ++
+        Seq[(String, String)](("spark.app.id", appId))
       fetcher.shutdown()
 
       // Create a new ActorSystem using driver's Spark properties to run the 
backend.
@@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
 
   def main(args: Array[String]) {
     args.length match {
-      case x if x < 4 =>
+      case x if x < 5 =>
         System.err.println(
           // Worker url is used in spark standalone mode to enforce 
fate-sharing with worker
           "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> 
<hostname> " +
-          "<cores> [<workerUrl>]")
+          "<cores> <appid> [<workerUrl>] ")
         System.exit(1)
-      case 4 =>
-        run(args(0), args(1), args(2), args(3).toInt, None)
-      case x if x > 4 =>
-        run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
+      case 5 =>
+        run(args(0), args(1), args(2), args(3).toInt, args(4), None)
+      case x if x > 5 =>
+        run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d7211ae..9bbfcdc 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,6 +74,7 @@ private[spark] class Executor(
   val executorSource = new ExecutorSource(this, executorId)
 
   // Initialize Spark environment (using system properties read above)
+  conf.set("spark.executor.id", "executor." + executorId)
   private val env = {
     if (!isLocal) {
       val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index d672158..c4d7362 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, 
executorId: String)
 
   override val metricRegistry = new MetricRegistry()
 
-  // TODO: It would be nice to pass the application name here
-  override val sourceName = "executor.%s".format(executorId)
+  override val sourceName = "executor"
 
   // Gauge for executor thread pool's actively executing task counts
   metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), 
new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala 
b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index a42c8b4..bca0b15 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend
       slaveInfo: SlaveInfo) {
     logInfo("Registered with Mesos as executor ID " + 
executorInfo.getExecutorId.getValue)
     this.driver = driver
-    val properties = Utils.deserialize[Array[(String, 
String)]](executorInfo.getData.toByteArray)
+    val properties = Utils.deserialize[Array[(String, 
String)]](executorInfo.getData.toByteArray) ++
+      Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
     executor = new Executor(
       executorInfo.getExecutorId.getValue,
       slaveInfo.getHostname,

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index fd316a8..5dd67b0 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -83,10 +83,10 @@ private[spark] class MetricsSystem private (
   def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
 
   metricsConfig.initialize()
-  registerSources()
-  registerSinks()
 
   def start() {
+    registerSources()
+    registerSinks()
     sinks.foreach(_.start)
   }
 
@@ -98,10 +98,39 @@ private[spark] class MetricsSystem private (
     sinks.foreach(_.report())
   }
 
+  /**
+   * Build a name that uniquely identifies each metric source.
+   * The name is structured as follows: <app ID>.<executor ID (or 
"driver")>.<source name>.
+   * If either ID is not available, this defaults to just using <source name>.
+   *
+   * @param source Metric source to be named by this method.
+   * @return An unique metric name for each combination of
+   *         application, executor/driver and metric source.
+   */
+  def buildRegistryName(source: Source): String = {
+    val appId = conf.getOption("spark.app.id")
+    val executorId = conf.getOption("spark.executor.id")
+    val defaultName = MetricRegistry.name(source.sourceName)
+
+    if (instance == "driver" || instance == "executor") {
+      if (appId.isDefined && executorId.isDefined) {
+        MetricRegistry.name(appId.get, executorId.get, source.sourceName)
+      } else {
+        // Only Driver and Executor are set spark.app.id and spark.executor.id.
+        // For instance, Master and Worker are not related to a specific 
application.
+        val warningMsg = s"Using default name $defaultName for source because 
%s is not set."
+        if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
+        if (executorId.isEmpty) { 
logWarning(warningMsg.format("spark.executor.id")) }
+        defaultName
+      }
+    } else { defaultName }
+  }
+
   def registerSource(source: Source) {
     sources += source
     try {
-      registry.register(source.sourceName, source.metricRegistry)
+      val regName = buildRegistryName(source)
+      registry.register(regName, source.metricRegistry)
     } catch {
       case e: IllegalArgumentException => logInfo("Metrics already 
registered", e)
     }
@@ -109,8 +138,9 @@ private[spark] class MetricsSystem private (
 
   def removeSource(source: Source) {
     sources -= source
+    val regName = buildRegistryName(source)
     registry.removeMatching(new MetricFilter {
-      def matches(name: String, metric: Metric): Boolean = 
name.startsWith(source.sourceName)
+      def matches(name: String, metric: Metric): Boolean = 
name.startsWith(regName)
     })
   }
 
@@ -125,7 +155,7 @@ private[spark] class MetricsSystem private (
         val source = Class.forName(classPath).newInstance()
         registerSource(source.asInstanceOf[Source])
       } catch {
-        case e: Exception => logError("Source class " + classPath + " cannot 
be instantialized", e)
+        case e: Exception => logError("Source class " + classPath + " cannot 
be instantiated", e)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 9494439..12668b6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
 import org.apache.spark.SparkContext
 import org.apache.spark.metrics.source.Source
 
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: 
SparkContext)
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
     extends Source {
   override val metricRegistry = new MetricRegistry()
-  override val sourceName = "%s.DAGScheduler".format(sc.appName)
+  override val sourceName = "DAGScheduler"
 
   metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new 
Gauge[Int] {
     override def getValue: Int = dagScheduler.failedStages.size

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 64b32ae..100c9ba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -43,25 +43,23 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, 
Utils}
  *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
  */
 private[spark] class EventLoggingListener(
-    appName: String,
+    appId: String,
+    logBaseDir: String,
     sparkConf: SparkConf,
     hadoopConf: Configuration)
   extends SparkListener with Logging {
 
   import EventLoggingListener._
 
-  def this(appName: String, sparkConf: SparkConf) =
-    this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+  def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
+    this(appId, logBaseDir, sparkConf, 
SparkHadoopUtil.get.newConfiguration(sparkConf))
 
   private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", 
false)
   private val shouldOverwrite = 
sparkConf.getBoolean("spark.eventLog.overwrite", false)
   private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
   private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 
100) * 1024
-  private val logBaseDir = sparkConf.get("spark.eventLog.dir", 
DEFAULT_LOG_DIR).stripSuffix("/")
-  private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_")
-    .toLowerCase + "-" + System.currentTimeMillis
-  val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
-
+  val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
+  val logDirName: String = logDir.split("/").last
   protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, 
outputBufferSize,
     shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
 
@@ -69,13 +67,6 @@ private[spark] class EventLoggingListener(
   private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
   /**
-   * Return only the unique application directory without the base directory.
-   */
-  def getApplicationLogDir(): String = {
-    name
-  }
-
-  /**
    * Begin logging events.
    * If compression is used, log a file that indicates which compression 
library is used.
    */
@@ -185,6 +176,18 @@ private[spark] object EventLoggingListener extends Logging 
{
   }
 
   /**
+   * Return a file-system-safe path to the log directory for the given 
application.
+   *
+   * @param logBaseDir A base directory for the path to the log directory for 
given application.
+   * @param appId A unique app ID.
+   * @return A path which consists of file-system-safe characters.
+   */
+  def getLogDirPath(logBaseDir: String, appId: String): String = {
+    val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_").toLowerCase
+    Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+  }
+
+  /**
    * Parse the event logging information associated with the logs in the given 
directory.
    *
    * Specifically, this looks for event log files, the Spark version file, the 
compression

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index a0be830..992c477 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -23,6 +23,8 @@ package org.apache.spark.scheduler
  * machines become available and can launch tasks on them.
  */
 private[spark] trait SchedulerBackend {
+  private val appId = "spark-application-" + System.currentTimeMillis
+
   def start(): Unit
   def stop(): Unit
   def reviveOffers(): Unit
@@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend {
   def isReady(): Boolean = true
 
   /**
-   * The application ID associated with the job, if any.
+   * Get an application ID associated with the job.
    *
-   * @return The application ID, or None if the backend does not provide an ID.
+   * @return An application ID
    */
-  def applicationId(): Option[String] = None
+  def applicationId(): String = appId
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1c1ce66..a129a43 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -31,6 +31,8 @@ import org.apache.spark.storage.BlockManagerId
  */
 private[spark] trait TaskScheduler {
 
+  private val appId = "spark-application-" + System.currentTimeMillis
+
   def rootPool: Pool
 
   def schedulingMode: SchedulingMode
@@ -66,10 +68,10 @@ private[spark] trait TaskScheduler {
     blockManagerId: BlockManagerId): Boolean
 
   /**
-   * The application ID associated with the job, if any.
+   * Get an application ID associated with the job.
    *
-   * @return The application ID, or None if the backend does not provide an ID.
+   * @return An application ID
    */
-  def applicationId(): Option[String] = None
+  def applicationId(): String = appId
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 633e892..4dc5504 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -492,7 +492,7 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  override def applicationId(): Option[String] = backend.applicationId()
+  override def applicationId(): String = backend.applicationId()
 
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 5c5ecc8..ed209d1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -68,9 +68,8 @@ private[spark] class SparkDeploySchedulerBackend(
     val command = 
Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
-    val eventLogDir = sc.eventLogger.map(_.logDir)
     val appDesc = new ApplicationDescription(sc.appName, maxCores, 
sc.executorMemory, command,
-      appUIAddress, eventLogDir)
+      appUIAddress, sc.eventLogDir)
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
@@ -129,7 +128,11 @@ private[spark] class SparkDeploySchedulerBackend(
     totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
   }
 
-  override def applicationId(): Option[String] = Option(appId)
+  override def applicationId(): String =
+    Option(appId).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
 
   private def waitForRegistration() = {
     registrationLock.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3161f1e..9082857 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -76,6 +76,8 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   var nextMesosTaskId = 0
 
+  @volatile var appId: String = _
+
   def newMesosTaskId(): Int = {
     val id = nextMesosTaskId
     nextMesosTaskId += 1
@@ -167,7 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
 
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, 
masterInfo: MasterInfo) {
-    logInfo("Registered as framework ID " + frameworkId.getValue)
+    appId = frameworkId.getValue
+    logInfo("Registered as framework ID " + appId)
     registeredLock.synchronized {
       isRegistered = true
       registeredLock.notifyAll()
@@ -313,4 +316,10 @@ private[spark] class CoarseMesosSchedulerBackend(
     slaveLost(d, s)
   }
 
+  override def applicationId(): String =
+    Option(appId).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 4c49aa0..b117863 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -30,7 +30,7 @@ import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => 
MesosTaskState, _}
 
 import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, 
SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
 /**
@@ -62,6 +62,8 @@ private[spark] class MesosSchedulerBackend(
 
   var classLoader: ClassLoader = null
 
+  @volatile var appId: String = _
+
   override def start() {
     synchronized {
       classLoader = Thread.currentThread.getContextClassLoader
@@ -177,7 +179,8 @@ private[spark] class MesosSchedulerBackend(
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, 
masterInfo: MasterInfo) {
     val oldClassLoader = setClassLoader()
     try {
-      logInfo("Registered as framework ID " + frameworkId.getValue)
+      appId = frameworkId.getValue
+      logInfo("Registered as framework ID " + appId)
       registeredLock.synchronized {
         isRegistered = true
         registeredLock.notifyAll()
@@ -372,4 +375,10 @@ private[spark] class MesosSchedulerBackend(
   // TODO: query Mesos for number of cores
   override def defaultParallelism() = 
sc.conf.getInt("spark.default.parallelism", 8)
 
+  override def applicationId(): String =
+    Option(appId).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 9ea25c2..58b78f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -88,6 +88,7 @@ private[spark] class LocalActor(
 private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val 
totalCores: Int)
   extends SchedulerBackend with ExecutorBackend {
 
+  private val appId = "local-" + System.currentTimeMillis
   var localActor: ActorRef = null
 
   override def start() {
@@ -115,4 +116,6 @@ private[spark] class LocalBackend(scheduler: 
TaskSchedulerImpl, val totalCores:
     localActor ! StatusUpdate(taskId, state, serializedData)
   }
 
+  override def applicationId(): String = appId
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 49fea6d..8569c6f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
 import org.apache.spark.SparkContext
 import org.apache.spark.metrics.source.Source
 
-private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: 
SparkContext)
+private[spark] class BlockManagerSource(val blockManager: BlockManager)
     extends Source {
   override val metricRegistry = new MetricRegistry()
-  override val sourceName = "%s.BlockManager".format(sc.appName)
+  override val sourceName = "BlockManager"
 
   metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new 
Gauge[Long] {
     override def getValue: Long = {

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index e42b181..3925f0c 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -17,14 +17,15 @@
 
 package org.apache.spark.metrics
 
-import org.apache.spark.metrics.source.Source
 import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.metrics.source.Source
 
-import scala.collection.mutable.ArrayBuffer
+import com.codahale.metrics.MetricRegistry
 
+import scala.collection.mutable.ArrayBuffer
 
 class MetricsSystemSuite extends FunSuite with BeforeAndAfter with 
PrivateMethodTester{
   var filePath: String = _
@@ -39,6 +40,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter 
with PrivateMethod
 
   test("MetricsSystem with default config") {
     val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, 
securityMgr)
+    metricsSystem.start()
     val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
     val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
 
@@ -49,6 +51,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter 
with PrivateMethod
 
   test("MetricsSystem with sources add") {
     val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, 
securityMgr)
+    metricsSystem.start()
     val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
     val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
 
@@ -60,4 +63,125 @@ class MetricsSystemSuite extends FunSuite with 
BeforeAndAfter with PrivateMethod
     metricsSystem.registerSource(source)
     assert(metricsSystem.invokePrivate(sources()).length === 1)
   }
+
+  test("MetricsSystem with Driver instance") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val appId = "testId"
+    val executorId = "driver"
+    conf.set("spark.app.id", appId)
+    conf.set("spark.executor.id", executorId)
+
+    val instanceName = "driver"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+    assert(metricName === s"$appId.$executorId.${source.sourceName}")
+  }
+
+  test("MetricsSystem with Driver instance and spark.app.id is not set") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val executorId = "driver"
+    conf.set("spark.executor.id", executorId)
+
+    val instanceName = "driver"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+    assert(metricName === source.sourceName)
+  }
+
+  test("MetricsSystem with Driver instance and spark.executor.id is not set") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val appId = "testId"
+    conf.set("spark.app.id", appId)
+
+    val instanceName = "driver"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+    assert(metricName === source.sourceName)
+  }
+
+  test("MetricsSystem with Executor instance") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val appId = "testId"
+    val executorId = "executor.1"
+    conf.set("spark.app.id", appId)
+    conf.set("spark.executor.id", executorId)
+
+    val instanceName = "executor"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+    assert(metricName === s"$appId.$executorId.${source.sourceName}")
+  }
+
+  test("MetricsSystem with Executor instance and spark.app.id is not set") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val executorId = "executor.1"
+    conf.set("spark.executor.id", executorId)
+
+    val instanceName = "executor"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+    assert(metricName === source.sourceName)
+  }
+
+  test("MetricsSystem with Executor instance and spark.executor.id is not 
set") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val appId = "testId"
+    conf.set("spark.app.id", appId)
+
+    val instanceName = "executor"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+    assert(metricName === source.sourceName)
+  }
+
+  test("MetricsSystem with instance which is neither Driver nor Executor") {
+    val source = new Source {
+      override val sourceName = "dummySource"
+      override val metricRegistry = new MetricRegistry()
+    }
+
+    val appId = "testId"
+    val executorId = "dummyExecutorId"
+    conf.set("spark.app.id", appId)
+    conf.set("spark.executor.id", executorId)
+
+    val instanceName = "testInstance"
+    val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, 
conf, securityMgr)
+
+    val metricName = driverMetricsSystem.buildRegistryName(source)
+
+    // Even if spark.app.id and spark.executor.id are set, they are not used 
for the metric name.
+    assert(metricName != s"$appId.$executorId.${source.sourceName}")
+    assert(metricName === source.sourceName)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index e5315bc..3efa854 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -169,7 +169,9 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter {
 
     // Verify logging directory exists
     val conf = getLoggingConf(logDirPath, compressionCodec)
-    val eventLogger = new EventLoggingListener("test", conf)
+    val logBaseDir = conf.get("spark.eventLog.dir")
+    val appId = EventLoggingListenerSuite.getUniqueApplicationId
+    val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
     eventLogger.start()
     val logPath = new Path(eventLogger.logDir)
     assert(fileSystem.exists(logPath))
@@ -209,7 +211,9 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter {
 
     // Verify that all information is correctly parsed before stop()
     val conf = getLoggingConf(logDirPath, compressionCodec)
-    val eventLogger = new EventLoggingListener("test", conf)
+    val logBaseDir = conf.get("spark.eventLog.dir")
+    val appId = EventLoggingListenerSuite.getUniqueApplicationId
+    val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
     eventLogger.start()
     var eventLoggingInfo = 
EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
     assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
@@ -228,7 +232,9 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter {
    */
   private def testEventLogging(compressionCodec: Option[String] = None) {
     val conf = getLoggingConf(logDirPath, compressionCodec)
-    val eventLogger = new EventLoggingListener("test", conf)
+    val logBaseDir = conf.get("spark.eventLog.dir")
+    val appId = EventLoggingListenerSuite.getUniqueApplicationId
+    val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
     val listenerBus = new LiveListenerBus
     val applicationStart = SparkListenerApplicationStart("Greatest App 
(N)ever", None,
       125L, "Mickey")
@@ -408,4 +414,6 @@ object EventLoggingListenerSuite {
     }
     conf
   }
+
+  def getUniqueApplicationId = "test-" + System.currentTimeMillis
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 7ab351d..48114fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -155,7 +155,8 @@ class ReplayListenerSuite extends FunSuite with 
BeforeAndAfter {
    * This child listener inherits only the event buffering functionality, but 
does not actually
    * log the events.
    */
-  private class EventMonster(conf: SparkConf) extends 
EventLoggingListener("test", conf) {
+  private class EventMonster(conf: SparkConf)
+    extends EventLoggingListener("test", "testdir", conf) {
     logger.close()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index 99c8d13..eb6e88c 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.nio.ByteBuffer
+import java.util.concurrent.Semaphore
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -36,6 +37,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
 
     val receiver = new FakeReceiver
     val executor = new FakeReceiverSupervisor(receiver)
+    val executorStarted = new Semaphore(0)
 
     assert(executor.isAllEmpty)
 
@@ -43,6 +45,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
     val executingThread = new Thread() {
       override def run() {
         executor.start()
+        executorStarted.release(1)
         executor.awaitTermination()
       }
     }
@@ -57,6 +60,9 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
       }
     }
 
+    // Ensure executor is started
+    executorStarted.acquire()
+
     // Verify that receiver was started
     assert(receiver.onStartCalled)
     assert(executor.isReceiverStarted)
@@ -186,10 +192,10 @@ class NetworkReceiverSuite extends FunSuite with Timeouts 
{
    * An implementation of NetworkReceiver that is used for testing a 
receiver's life cycle.
    */
   class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
-    var otherThread: Thread = null
-    var receiving = false
-    var onStartCalled = false
-    var onStopCalled = false
+    @volatile var otherThread: Thread = null
+    @volatile var receiving = false
+    @volatile var onStartCalled = false
+    @volatile var onStopCalled = false
 
     def onStart() {
       otherThread = new Thread() {

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 10cbeb8..229b7a0 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -47,6 +47,7 @@ class ExecutorRunnable(
     hostname: String,
     executorMemory: Int,
     executorCores: Int,
+    appAttemptId: String,
     securityMgr: SecurityManager)
   extends Runnable with ExecutorRunnableUtil with Logging {
 
@@ -83,7 +84,7 @@ class ExecutorRunnable(
     ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
 
     val commands = prepareCommand(masterAddress, slaveId, hostname, 
executorMemory, executorCores,
-      localResources)
+      appAttemptId, localResources)
     logInfo("Setting up executor with commands: " + commands)
     ctx.setCommands(commands)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index d7a7175..5cb4753 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -43,6 +43,7 @@ trait ExecutorRunnableUtil extends Logging {
       hostname: String,
       executorMemory: Int,
       executorCores: Int,
+      appId: String,
       localResources: HashMap[String, LocalResource]): List[String] = {
     // Extra options for the JVM
     val javaOpts = ListBuffer[String]()
@@ -114,6 +115,7 @@ trait ExecutorRunnableUtil extends Logging {
       slaveId.toString,
       hostname.toString,
       executorCores.toString,
+      appId,
       "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
       "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4f4f1d2..e1af8d5 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -57,6 +57,7 @@ object AllocationType extends Enumeration {
 private[yarn] abstract class YarnAllocator(
     conf: Configuration,
     sparkConf: SparkConf,
+    appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
     preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
     securityMgr: SecurityManager)
@@ -295,6 +296,7 @@ private[yarn] abstract class YarnAllocator(
             executorHostname,
             executorMemory,
             executorCores,
+            appAttemptId.getApplicationId.toString,
             securityMgr)
           launcherPool.execute(executorRunnable)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 200a308..6bb4b82 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -155,6 +155,10 @@ private[spark] class YarnClientSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
-  override def applicationId(): Option[String] = 
Option(appId).map(_.toString())
+  override def applicationId(): String =
+    Option(appId).map(_.toString).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 39436d0..3a186cf 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -48,6 +48,13 @@ private[spark] class YarnClusterSchedulerBackend(
     totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
   }
 
-  override def applicationId(): Option[String] = 
sc.getConf.getOption("spark.yarn.app.id")
+  override def applicationId(): String =
+    // In YARN Cluster mode, spark.yarn.app.id is expect to be set
+    // before user application is launched.
+    // So, if spark.yarn.app.id is not set, it is something wrong.
+    sc.getConf.getOption("spark.yarn.app.id").getOrElse {
+      logError("Application ID is not set.")
+      super.applicationId
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 833be12..0b5a92d 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -47,6 +47,7 @@ class ExecutorRunnable(
     hostname: String,
     executorMemory: Int,
     executorCores: Int,
+    appId: String,
     securityMgr: SecurityManager)
   extends Runnable with ExecutorRunnableUtil with Logging {
 
@@ -80,7 +81,7 @@ class ExecutorRunnable(
     ctx.setTokens(ByteBuffer.wrap(dob.getData()))
 
     val commands = prepareCommand(masterAddress, slaveId, hostname, 
executorMemory, executorCores,
-      localResources)
+      appId, localResources)
 
     logInfo(s"Setting up executor with environment: $env")
     logInfo("Setting up executor with commands: " + commands)

http://git-wip-us.apache.org/repos/asf/spark/blob/79e45c93/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e44a8db..2bbf5d7 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -41,7 +41,7 @@ private[yarn] class YarnAllocationHandler(
     args: ApplicationMasterArguments,
     preferredNodes: collection.Map[String, collection.Set[SplitInfo]], 
     securityMgr: SecurityManager)
-  extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
+  extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, 
securityMgr) {
 
   override protected def releaseContainer(container: Container) = {
     amClient.releaseAssignedContainer(container.getId())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to