This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b64688e  [SPARK-29303][WEB UI] Add UI support for stage level 
scheduling
b64688e is described below

commit b64688ebbaac7afd3734c0d84d1e77b1fd2d2e9d
Author: Thomas Graves <tgra...@nvidia.com>
AuthorDate: Thu May 21 13:11:35 2020 -0500

    [SPARK-29303][WEB UI] Add UI support for stage level scheduling
    
    ### What changes were proposed in this pull request?
    
    This adds UI updates to support stage level scheduling and 
ResourceProfiles. 3 main things have been added. ResourceProfile id added to 
the Stage page, the Executors page now has an optional selectable column to 
show the ResourceProfile Id of each executor, and the Environment page now has 
a section with the ResourceProfile ids.  Along with this the rest api for 
environment page was updated to include the Resource profile information.
    
    I debating on splitting the resource profile information into its own page 
but I wasn't sure it called for a completely separate page. Open to peoples 
thoughts on this.
    
    Screen shots:
    ![Screen Shot 2020-04-01 at 3 07 46 
PM](https://user-images.githubusercontent.com/4563792/78185169-469a7000-7430-11ea-8b0c-d9ede2d41df8.png)
    ![Screen Shot 2020-04-01 at 3 08 14 
PM](https://user-images.githubusercontent.com/4563792/78185175-48fcca00-7430-11ea-8d1d-6b9333700f32.png)
    ![Screen Shot 2020-04-01 at 3 09 03 
PM](https://user-images.githubusercontent.com/4563792/78185176-4a2df700-7430-11ea-92d9-73c382bb0f32.png)
    ![Screen Shot 2020-04-01 at 11 05 48 
AM](https://user-images.githubusercontent.com/4563792/78185186-4dc17e00-7430-11ea-8962-f749dd47ea60.png)
    
    ### Why are the changes needed?
    
    For user to be able to know what resource profile was used with which stage 
and executors. The resource profile information is also available so user 
debugging can see exactly what resources were requested with that profile.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, UI updates.
    
    ### How was this patch tested?
    
    Unit tests and tested on yarn both active applications and with the history 
server.
    
    Closes #28094 from tgravescs/SPARK-29303-pr.
    
    Lead-authored-by: Thomas Graves <tgra...@nvidia.com>
    Co-authored-by: Thomas Graves <tgra...@apache.org>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/SparkFirehoseListener.java    |   5 +
 .../spark/ui/static/executorspage-template.html    |   1 +
 .../org/apache/spark/ui/static/executorspage.js    |   7 +-
 .../main/scala/org/apache/spark/SparkContext.scala |   2 +-
 .../deploy/history/HistoryAppStatusStore.scala     |   3 +-
 .../spark/resource/ResourceProfileManager.scala    |   7 +-
 .../spark/scheduler/EventLoggingListener.scala     |   4 +
 .../org/apache/spark/scheduler/SparkListener.scala |  12 +++
 .../apache/spark/scheduler/SparkListenerBus.scala  |   2 +
 .../apache/spark/status/AppStatusListener.scala    |  33 +++++-
 .../org/apache/spark/status/AppStatusStore.scala   |   8 +-
 .../scala/org/apache/spark/status/LiveEntity.scala |  25 ++++-
 .../status/api/v1/OneApplicationResource.scala     |   4 +-
 .../scala/org/apache/spark/status/api/v1/api.scala |  18 +++-
 .../scala/org/apache/spark/status/storeTypes.scala |   7 ++
 .../org/apache/spark/ui/env/EnvironmentPage.scala  |  48 +++++++++
 .../scala/org/apache/spark/ui/jobs/JobPage.scala   |   4 +-
 .../scala/org/apache/spark/ui/jobs/StagePage.scala |   4 +
 .../scala/org/apache/spark/util/JsonProtocol.scala | 101 +++++++++++++++++--
 .../app_environment_expectation.json               |   3 +-
 .../application_list_json_expectation.json         |  15 +++
 .../blacklisting_for_stage_expectation.json        |   3 +-
 .../blacklisting_node_for_stage_expectation.json   |   3 +-
 .../complete_stage_list_json_expectation.json      |   9 +-
 .../completed_app_list_json_expectation.json       |  15 +++
 .../executor_list_json_expectation.json            |   3 +-
 ...ist_with_executor_metrics_json_expectation.json |  12 ++-
 .../executor_memory_usage_expectation.json         |  15 ++-
 .../executor_node_blacklisting_expectation.json    |  15 ++-
 ...de_blacklisting_unblacklisting_expectation.json |  15 ++-
 .../executor_resource_information_expectation.json |   9 +-
 .../failed_stage_list_json_expectation.json        |   3 +-
 .../limit_app_list_json_expectation.json           |  30 +++---
 .../minDate_app_list_json_expectation.json         |  18 +++-
 .../minEndDate_app_list_json_expectation.json      |  15 +++
 .../multiple_resource_profiles_expectation.json    | 112 +++++++++++++++++++++
 .../one_stage_attempt_json_expectation.json        |   3 +-
 .../one_stage_json_expectation.json                |   3 +-
 .../stage_list_json_expectation.json               |  12 ++-
 ...age_list_with_accumulable_json_expectation.json |   3 +-
 .../stage_with_accumulable_json_expectation.json   |   3 +-
 .../spark-events/application_1578436911597_0052    |  27 +++++
 .../spark/ExecutorAllocationManagerSuite.scala     |   2 +-
 .../spark/deploy/history/HistoryServerSuite.scala  |   1 +
 .../resource/ResourceProfileManagerSuite.scala     |  13 ++-
 .../scala/org/apache/spark/ui/StagePageSuite.scala |   3 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  |  94 +++++++++++++++--
 dev/.rat-excludes                                  |   1 +
 .../KubernetesClusterSchedulerBackendSuite.scala   |   5 +-
 49 files changed, 657 insertions(+), 103 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java 
b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 731f6fc..579e7ff 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -163,6 +163,11 @@ public class SparkFirehoseListener implements 
SparkListenerInterface {
   }
 
   @Override
+  public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
+    onEvent(event);
+  }
+
+  @Override
   public void onOtherEvent(SparkListenerEvent event) {
     onEvent(event);
   }
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
 
b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 0b26bfc..0729dfe 100644
--- 
a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ 
b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -89,6 +89,7 @@ limitations under the License.
           <th>Disk Used</th>
           <th>Cores</th>
           <th>Resources</th>
+          <th>Resource Profile Id</th>
           <th><span data-toggle="tooltip" data-placement="top" title="Number 
of tasks currently executing. Darker shading highlights executors with more 
active tasks.">Active Tasks</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Number 
of tasks that have failed on this executor. Darker shading highlights executors 
with a high proportion of failed tasks.">Failed Tasks</span></th>
           <th>Complete Tasks</th>
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js 
b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index ec57797..520edb9 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -119,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
 }
 
 var sumOptionalColumns = [3, 4];
-var execOptionalColumns = [5, 6, 9];
+var execOptionalColumns = [5, 6, 9, 10];
 var execDataTable;
 var sumDataTable;
 
@@ -415,6 +415,7 @@ $(document).ready(function () {
                         {data: 'diskUsed', render: formatBytes},
                         {data: 'totalCores'},
                         {name: 'resourcesCol', data: 'resources', render: 
formatResourceCells, orderable: false},
+                        {name: 'resourceProfileIdCol', data: 
'resourceProfileId'},
                         {
                             data: 'activeTasks',
                             "fnCreatedCell": function (nTd, sData, oData, 
iRow, iCol) {
@@ -461,7 +462,8 @@ $(document).ready(function () {
                     "columnDefs": [
                         {"visible": false, "targets": 5},
                         {"visible": false, "targets": 6},
-                        {"visible": false, "targets": 9}
+                        {"visible": false, "targets": 9},
+                        {"visible": false, "targets": 10}
                     ],
                     "deferRender": true
                 };
@@ -570,6 +572,7 @@ $(document).ready(function () {
                     "<div id='on_heap_memory' 
class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' 
data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
                     "<div id='off_heap_memory' 
class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' 
data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
                     "<div id='extra_resources' 
class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' 
data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
+                    "<div id='resource_prof_id' 
class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' 
data-sum-col-idx='' data-exec-col-idx='10'>Resource Profile Id</div>" +
                     "</div>");
 
                 reselectCheckboxesBasedOnTaskTableState();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5c92527..38d7319 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -435,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
     }
 
     _listenerBus = new LiveListenerBus(_conf)
-    _resourceProfileManager = new ResourceProfileManager(_conf)
+    _resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus)
 
     // Initialize the app status store and listener before SparkEnv is created 
so that it gets
     // all events.
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
index 74105002..7973652 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
@@ -72,7 +72,8 @@ private[spark] class HistoryAppStatusStore(
       source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
       source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, 
source.addTime,
       source.removeTime, source.removeReason, newExecutorLogs, 
source.memoryMetrics,
-      source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, 
source.resources)
+      source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, 
source.resources,
+      source.resourceProfileId)
   }
 
 }
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index c3e2444..f365548 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -25,17 +25,19 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Tests._
+import org.apache.spark.scheduler.{LiveListenerBus, 
SparkListenerResourceProfileAdded}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.Utils.isTesting
 
 /**
  * Manager of resource profiles. The manager allows one place to keep the 
actual ResourceProfiles
  * and everywhere else we can use the ResourceProfile Id to save on space.
- * Note we never remove a resource profile at this point. Its expected this 
number if small
+ * Note we never remove a resource profile at this point. Its expected this 
number is small
  * so this shouldn't be much overhead.
  */
 @Evolving
-private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends 
Logging {
+private[spark] class ResourceProfileManager(sparkConf: SparkConf,
+    listenerBus: LiveListenerBus) extends Logging {
   private val resourceProfileIdToResourceProfile = new HashMap[Int, 
ResourceProfile]()
 
   private val (readLock, writeLock) = {
@@ -83,6 +85,7 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf) extends Loggin
       // force the computation of maxTasks and limitingResource now so we 
don't have cost later
       rp.limitingResource(sparkConf)
       logInfo(s"Added ResourceProfile id: ${rp.id}")
+      listenerBus.post(SparkListenerResourceProfileAdded(rp))
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 24e2a5e..b2e9a0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -235,6 +235,10 @@ private[spark] class EventLoggingListener(
     }
   }
 
+  override def onResourceProfileAdded(event: 
SparkListenerResourceProfileAdded): Unit = {
+    logEvent(event, flushLogger = true)
+  }
+
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     if (event.logEvent) {
       logEvent(event, flushLogger = true)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index c150b03..62d54f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
 import org.apache.spark.TaskEndReason
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 
@@ -207,6 +208,10 @@ case class SparkListenerApplicationEnd(time: Long) extends 
SparkListenerEvent
 @DeveloperApi
 case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
+@DeveloperApi
+case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
+  extends SparkListenerEvent
+
 /**
  * Interface for listening to events from the Spark scheduler. Most 
applications should probably
  * extend SparkListener or SparkFirehoseListener directly, rather than 
implementing this class.
@@ -348,6 +353,11 @@ private[spark] trait SparkListenerInterface {
    * Called when other events like SQL-specific events are posted.
    */
   def onOtherEvent(event: SparkListenerEvent): Unit
+
+  /**
+   * Called when a Resource Profile is added to the manager.
+   */
+  def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit
 }
 
 
@@ -421,4 +431,6 @@ abstract class SparkListener extends SparkListenerInterface 
{
       speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = { }
+
+  override def onResourceProfileAdded(event: 
SparkListenerResourceProfileAdded): Unit = { }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 8f6b7ad..3d316c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus
         listener.onBlockUpdated(blockUpdated)
       case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
         listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
+      case resourceProfileAdded: SparkListenerResourceProfileAdded =>
+        listener.onResourceProfileAdded(resourceProfileAdded)
       case _ => listener.onOtherEvent(event)
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c3f22f3..f7b0e9b 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -28,6 +28,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.CPUS_PER_TASK
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.resource.ResourceProfile.CPUS
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage._
@@ -51,7 +52,7 @@ private[spark] class AppStatusListener(
   private var sparkVersion = SPARK_VERSION
   private var appInfo: v1.ApplicationInfo = null
   private var appSummary = new AppSummary(0, 0)
-  private var coresPerTask: Int = 1
+  private var defaultCpusPerTask: Int = 1
 
   // How often to update live entities. -1 means "never update" when replaying 
applications,
   // meaning only the last write will happen. For live applications, this 
avoids a few
@@ -76,6 +77,7 @@ private[spark] class AppStatusListener(
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
+  private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
 
   private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
   // Keep the active executor count as a separate variable to avoid having to 
do synchronization
@@ -145,6 +147,20 @@ private[spark] class AppStatusListener(
     }
   }
 
+  override def onResourceProfileAdded(event: 
SparkListenerResourceProfileAdded): Unit = {
+    val maxTasks = if (event.resourceProfile.isCoresLimitKnown) {
+      Some(event.resourceProfile.maxTasksPerExecutor(conf))
+    } else {
+      None
+    }
+    val liveRP = new LiveResourceProfile(event.resourceProfile.id,
+      event.resourceProfile.executorResources, 
event.resourceProfile.taskResources, maxTasks)
+    liveResourceProfiles(event.resourceProfile.id) = liveRP
+    val rpInfo = new v1.ResourceProfileInfo(liveRP.resourceProfileId,
+      liveRP.executorResources, liveRP.taskResources)
+    kvstore.write(new ResourceProfileWrapper(rpInfo))
+  }
+
   override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): 
Unit = {
     val details = event.environmentDetails
 
@@ -159,10 +175,11 @@ private[spark] class AppStatusListener(
       details.getOrElse("Spark Properties", Nil),
       details.getOrElse("Hadoop Properties", Nil),
       details.getOrElse("System Properties", Nil),
-      details.getOrElse("Classpath Entries", Nil))
+      details.getOrElse("Classpath Entries", Nil),
+      Nil)
 
-    coresPerTask = 
envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
-      .getOrElse(coresPerTask)
+    defaultCpusPerTask = 
envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
+      .getOrElse(defaultCpusPerTask)
 
     kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
   }
@@ -197,10 +214,16 @@ private[spark] class AppStatusListener(
     exec.host = event.executorInfo.executorHost
     exec.isActive = true
     exec.totalCores = event.executorInfo.totalCores
-    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+    val rpId = event.executorInfo.resourceProfileId
+    val liveRP = liveResourceProfiles.get(rpId)
+    val cpusPerTask = liveRP.flatMap(_.taskResources.get(CPUS))
+      .map(_.amount.toInt).getOrElse(defaultCpusPerTask)
+    val maxTasksPerExec = liveRP.flatMap(_.maxTasksPerExecutor)
+    exec.maxTasks = maxTasksPerExec.getOrElse(event.executorInfo.totalCores / 
cpusPerTask)
     exec.executorLogs = event.executorInfo.logUrlMap
     exec.resources = event.executorInfo.resourcesInfo
     exec.attributes = event.executorInfo.attributes
+    exec.resourceProfileId = rpId
     liveUpdate(exec, System.nanoTime())
   }
 
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 42d4071..ea033d0 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkException}
+import org.apache.spark.resource.ResourceProfileManager
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.scope._
 import org.apache.spark.util.Utils
@@ -51,6 +52,10 @@ private[spark] class AppStatusStore(
     store.read(klass, klass.getName()).info
   }
 
+  def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
+    store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq
+  }
+
   def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
     val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
     if (statuses != null && !statuses.isEmpty()) {
@@ -486,7 +491,8 @@ private[spark] class AppStatusStore(
       accumulatorUpdates = stage.accumulatorUpdates,
       tasks = Some(tasks),
       executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
-      killedTasksSummary = stage.killedTasksSummary)
+      killedTasksSummary = stage.killedTasksSummary,
+      resourceProfileId = stage.resourceProfileId)
   }
 
   def rdd(rddId: Int): v1.RDDStorageInfo = {
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 2714f30..86cb4fe 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -28,7 +28,7 @@ import com.google.common.collect.Interners
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, ResourceProfile, TaskResourceRequest}
 import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.{RDDInfo, StorageLevel}
@@ -245,6 +245,21 @@ private class LiveTask(
 
 }
 
+private class LiveResourceProfile(
+    val resourceProfileId: Int,
+    val executorResources: Map[String, ExecutorResourceRequest],
+    val taskResources: Map[String, TaskResourceRequest],
+    val maxTasksPerExecutor: Option[Int]) extends LiveEntity {
+
+  def toApi(): v1.ResourceProfileInfo = {
+    new v1.ResourceProfileInfo(resourceProfileId, executorResources, 
taskResources)
+  }
+
+  override protected def doUpdate(): Any = {
+    new ResourceProfileWrapper(toApi())
+  }
+}
+
 private[spark] class LiveExecutor(val executorId: String, _addTime: Long) 
extends LiveEntity {
 
   var hostPort: String = null
@@ -285,6 +300,8 @@ private[spark] class LiveExecutor(val executorId: String, 
_addTime: Long) extend
   var usedOnHeap = 0L
   var usedOffHeap = 0L
 
+  var resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
   // peak values for executor level metrics
@@ -327,7 +344,8 @@ private[spark] class LiveExecutor(val executorId: String, 
_addTime: Long) extend
       blacklistedInStages,
       Some(peakExecutorMetrics).filter(_.isSet),
       attributes,
-      resources)
+      resources,
+      resourceProfileId)
     new ExecutorSummaryWrapper(info)
   }
 }
@@ -465,7 +483,8 @@ private class LiveStage extends LiveEntity {
       accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
       tasks = None,
       executorSummary = None,
-      killedTasksSummary = killedSummary)
+      killedTasksSummary = killedSummary,
+      resourceProfileId = info.resourceProfileId)
   }
 
   override protected def doUpdate(): Any = {
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
index cf5c759..e0c85fd 100644
--- 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
@@ -101,12 +101,14 @@ private[v1] class AbstractApplicationResource extends 
BaseAppResource {
   @Path("environment")
   def environmentInfo(): ApplicationEnvironmentInfo = withUI { ui =>
     val envInfo = ui.store.environmentInfo()
+    val resourceProfileInfo = ui.store.resourceProfileInfo()
     new v1.ApplicationEnvironmentInfo(
       envInfo.runtime,
       Utils.redact(ui.conf, envInfo.sparkProperties),
       Utils.redact(ui.conf, envInfo.hadoopProperties),
       Utils.redact(ui.conf, envInfo.systemProperties),
-      envInfo.classpathEntries)
+      envInfo.classpathEntries,
+      resourceProfileInfo)
   }
 
   @GET
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5ec9b36..e89e291 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -30,7 +30,7 @@ import 
com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.metrics.ExecutorMetricType
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, TaskResourceRequest}
 
 case class ApplicationInfo private[spark](
     id: String,
@@ -62,6 +62,11 @@ case class ApplicationAttemptInfo private[spark](
 
 }
 
+class ResourceProfileInfo private[spark](
+    val id: Int,
+    val executorResources: Map[String, ExecutorResourceRequest],
+    val taskResources: Map[String, TaskResourceRequest])
+
 class ExecutorStageSummary private[spark](
     val taskTime : Long,
     val failedTasks : Int,
@@ -109,7 +114,8 @@ class ExecutorSummary private[spark](
     @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
     val peakMemoryMetrics: Option[ExecutorMetrics],
     val attributes: Map[String, String],
-    val resources: Map[String, ResourceInformation])
+    val resources: Map[String, ResourceInformation],
+    val resourceProfileId: Int)
 
 class MemoryMetrics private[spark](
     val usedOnHeapStorageMemory: Long,
@@ -252,7 +258,8 @@ class StageData private[spark](
     val accumulatorUpdates: Seq[AccumulableInfo],
     val tasks: Option[Map[Long, TaskData]],
     val executorSummary: Option[Map[String, ExecutorStageSummary]],
-    val killedTasksSummary: Map[String, Int])
+    val killedTasksSummary: Map[String, Int],
+    val resourceProfileId: Int)
 
 class TaskData private[spark](
     val taskId: Long,
@@ -365,12 +372,15 @@ class AccumulableInfo private[spark](
 class VersionInfo private[spark](
   val spark: String)
 
+// Note the resourceProfiles information are only added here on return from the
+// REST call, they are not stored with it.
 class ApplicationEnvironmentInfo private[spark] (
     val runtime: RuntimeInfo,
     val sparkProperties: Seq[(String, String)],
     val hadoopProperties: Seq[(String, String)],
     val systemProperties: Seq[(String, String)],
-    val classpathEntries: Seq[(String, String)])
+    val classpathEntries: Seq[(String, String)],
+    val resourceProfiles: Seq[ResourceProfileInfo])
 
 class RuntimeInfo private[spark](
     val javaVersion: String,
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala 
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index c957ff7..b40f730 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -374,6 +374,13 @@ private[spark] class RDDStorageInfoWrapper(val info: 
RDDStorageInfo) {
 
 }
 
+private[spark] class ResourceProfileWrapper(val rpInfo: ResourceProfileInfo) {
+
+  @JsonIgnore @KVIndex
+  def id: Int = rpInfo.id
+
+}
+
 private[spark] class ExecutorStageSummaryWrapper(
     val stageId: Int,
     val stageAttemptId: Int,
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala 
b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
index c6eb461..2f5b731 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -19,9 +19,11 @@ package org.apache.spark.ui.env
 
 import javax.servlet.http.HttpServletRequest
 
+import scala.collection.mutable.StringBuilder
 import scala.xml.Node
 
 import org.apache.spark.SparkConf
+import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.ui._
 import org.apache.spark.util.Utils
@@ -38,6 +40,37 @@ private[ui] class EnvironmentPage(
       "Java Home" -> appEnv.runtime.javaHome,
       "Scala Version" -> appEnv.runtime.scalaVersion)
 
+    def constructExecutorRequestString(execReqs: Map[String, 
ExecutorResourceRequest]): String = {
+      execReqs.map {
+        case (_, execReq) =>
+          val execStr = new StringBuilder(s"\t${execReq.resourceName}: 
[amount: ${execReq.amount}")
+          if (execReq.discoveryScript.nonEmpty) {
+            execStr ++= s", discovery: ${execReq.discoveryScript}"
+          }
+          if (execReq.vendor.nonEmpty) {
+            execStr ++= s", vendor: ${execReq.vendor}"
+          }
+          execStr ++= "]"
+          execStr.toString()
+      }.mkString("\n")
+    }
+
+    def constructTaskRequestString(taskReqs: Map[String, 
TaskResourceRequest]): String = {
+      taskReqs.map {
+        case (_, taskReq) => s"\t${taskReq.resourceName}: [amount: 
${taskReq.amount}]"
+      }.mkString("\n")
+    }
+
+    val resourceProfileInfo = store.resourceProfileInfo().map { rinfo =>
+      val einfo = constructExecutorRequestString(rinfo.executorResources)
+      val tinfo = constructTaskRequestString(rinfo.taskResources)
+      val res = s"Executor Reqs:\n$einfo\nTask Reqs:\n$tinfo"
+      (rinfo.id.toString, res)
+    }.toMap
+
+    val resourceProfileInformationTable = 
UIUtils.listingTable(resourceProfileHeader,
+      jvmRowDataPre, resourceProfileInfo.toSeq.sortWith(_._1.toInt < 
_._1.toInt),
+      fixedWidth = true, headerClasses = headerClassesNoSortValues)
     val runtimeInformationTable = UIUtils.listingTable(
       propertyHeader, jvmRow, jvmInformation.toSeq.sorted, fixedWidth = true,
       headerClasses = headerClasses)
@@ -77,6 +110,17 @@ private[ui] class EnvironmentPage(
         <div class="aggregated-sparkProperties collapsible-table">
           {sparkPropertiesTable}
         </div>
+        <span class="collapse-aggregated-execResourceProfileInformation 
collapse-table"
+              
onClick="collapseTable('collapse-aggregated-execResourceProfileInformation',
+            'aggregated-execResourceProfileInformation')">
+          <h4>
+            <span class="collapse-table-arrow arrow-open"></span>
+            <a>Resource Profiles</a>
+          </h4>
+        </span>
+        <div class="aggregated-execResourceProfileInformation 
collapsible-table">
+          {resourceProfileInformationTable}
+        </div>
         <span class="collapse-aggregated-hadoopProperties collapse-table"
               onClick="collapseTable('collapse-aggregated-hadoopProperties',
             'aggregated-hadoopProperties')">
@@ -115,10 +159,14 @@ private[ui] class EnvironmentPage(
     UIUtils.headerSparkPage(request, "Environment", content, parent)
   }
 
+  private def resourceProfileHeader = Seq("Resource Profile Id", "Resource 
Profile Contents")
   private def propertyHeader = Seq("Name", "Value")
   private def classPathHeader = Seq("Resource", "Source")
   private def headerClasses = Seq("sorttable_alpha", "sorttable_alpha")
+  private def headerClassesNoSortValues = Seq("sorttable_numeric", 
"sorttable_nosort")
 
+  private def jvmRowDataPre(kv: (String, String)) =
+    <tr><td>{kv._1}</td><td><pre>{kv._2}</pre></td></tr>
   private def jvmRow(kv: (String, String)) = 
<tr><td>{kv._1}</td><td>{kv._2}</td></tr>
   private def propertyRow(kv: (String, String)) = 
<tr><td>{kv._1}</td><td>{kv._2}</td></tr>
   private def classPathRow(data: (String, String)) = 
<tr><td>{data._1}</td><td>{data._2}</td></tr>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 9be7124..542dc39 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq, Unparsed, Utility}
 import org.apache.commons.text.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui._
@@ -253,7 +254,8 @@ private[ui] class JobPage(parent: JobsTab, store: 
AppStatusStore) extends WebUIP
           accumulatorUpdates = Nil,
           tasks = None,
           executorSummary = None,
-          killedTasksSummary = Map())
+          killedTasksSummary = Map(),
+          ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 7973d30..1b07227 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -143,6 +143,10 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
       <div>
         <ul class="list-unstyled">
           <li>
+            <strong>Resource Profile Id: </strong>
+            {stageData.resourceProfileId}
+          </li>
+          <li>
             <strong>Total Time Across All Tasks: </strong>
             {UIUtils.formatDuration(stageData.executorRunTime)}
           </li>
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 9254ac9..26bbff5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
+import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, ResourceProfile, TaskResourceRequest}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
@@ -105,6 +105,8 @@ private[spark] object JsonProtocol {
         stageExecutorMetricsToJson(stageExecutorMetrics)
       case blockUpdate: SparkListenerBlockUpdated =>
         blockUpdateToJson(blockUpdate)
+      case resourceProfileAdded: SparkListenerResourceProfileAdded =>
+        resourceProfileAddedToJson(resourceProfileAdded)
       case _ => parse(mapper.writeValueAsString(event))
     }
   }
@@ -224,6 +226,15 @@ private[spark] object JsonProtocol {
     ("Timestamp" -> applicationEnd.time)
   }
 
+  def resourceProfileAddedToJson(profileAdded: 
SparkListenerResourceProfileAdded): JValue = {
+    ("Event" -> 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.resourceProfileAdded) ~
+      ("Resource Profile Id" -> profileAdded.resourceProfile.id) ~
+      ("Executor Resource Requests" ->
+        
executorResourceRequestMapToJson(profileAdded.resourceProfile.executorResources))
 ~
+      ("Task Resource Requests" ->
+        
taskResourceRequestMapToJson(profileAdded.resourceProfile.taskResources))
+  }
+
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = 
{
     ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
     ("Timestamp" -> executorAdded.time) ~
@@ -297,7 +308,8 @@ private[spark] object JsonProtocol {
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
     ("Failure Reason" -> failureReason) ~
-    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
+    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values)) ~
+    ("Resource Profile Id" -> stageInfo.resourceProfileId)
   }
 
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -500,7 +512,8 @@ private[spark] object JsonProtocol {
     ("Total Cores" -> executorInfo.totalCores) ~
     ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
     ("Attributes" -> mapToJson(executorInfo.attributes)) ~
-    ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo))
+    ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
+    ("Resource Profile Id" -> executorInfo.resourceProfileId)
   }
 
   def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
@@ -518,6 +531,34 @@ private[spark] object JsonProtocol {
     ("Disk Size" -> blockUpdatedInfo.diskSize)
   }
 
+  def executorResourceRequestToJson(execReq: ExecutorResourceRequest): JValue 
= {
+    ("Resource Name" -> execReq.resourceName) ~
+    ("Amount" -> execReq.amount) ~
+    ("Discovery Script" -> execReq.discoveryScript) ~
+    ("Vendor" -> execReq.vendor)
+  }
+
+  def executorResourceRequestMapToJson(m: Map[String, 
ExecutorResourceRequest]): JValue = {
+    val jsonFields = m.map {
+      case (k, execReq) =>
+        JField(k, executorResourceRequestToJson(execReq))
+    }
+    JObject(jsonFields.toList)
+  }
+
+  def taskResourceRequestToJson(taskReq: TaskResourceRequest): JValue = {
+    ("Resource Name" -> taskReq.resourceName) ~
+    ("Amount" -> taskReq.amount)
+  }
+
+  def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest]): 
JValue = {
+    val jsonFields = m.map {
+      case (k, taskReq) =>
+        JField(k, taskResourceRequestToJson(taskReq))
+    }
+    JObject(jsonFields.toList)
+  }
+
   /** ------------------------------ *
    * Util JSON serialization methods |
    * ------------------------------- */
@@ -577,6 +618,7 @@ private[spark] object JsonProtocol {
     val metricsUpdate = 
Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
     val stageExecutorMetrics = 
Utils.getFormattedClassName(SparkListenerStageExecutorMetrics)
     val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
+    val resourceProfileAdded = 
Utils.getFormattedClassName(SparkListenerResourceProfileAdded)
   }
 
   def sparkEventFromJson(json: JValue): SparkListenerEvent = {
@@ -602,6 +644,7 @@ private[spark] object JsonProtocol {
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
       case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
+      case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
       case other => mapper.readValue(compact(render(json)), 
Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
     }
@@ -678,6 +721,45 @@ private[spark] object JsonProtocol {
     SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
 
+  def resourceProfileAddedFromJson(json: JValue): 
SparkListenerResourceProfileAdded = {
+    val profId = (json \ "Resource Profile Id").extract[Int]
+    val executorReqs = executorResourceRequestMapFromJson(json \ "Executor 
Resource Requests")
+    val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource 
Requests")
+    val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
+    rp.setResourceProfileId(profId)
+    SparkListenerResourceProfileAdded(rp)
+  }
+
+  def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = 
{
+    val rName = (json \ "Resource Name").extract[String]
+    val amount = (json \ "Amount").extract[Int]
+    val discoveryScript = (json \ "Discovery Script").extract[String]
+    val vendor = (json \ "Vendor").extract[String]
+    new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
+  }
+
+  def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
+    val rName = (json \ "Resource Name").extract[String]
+    val amount = (json \ "Amount").extract[Int]
+    new TaskResourceRequest(rName, amount)
+  }
+
+  def taskResourceRequestMapFromJson(json: JValue): Map[String, 
TaskResourceRequest] = {
+    val jsonFields = json.asInstanceOf[JObject].obj
+    jsonFields.map { case JField(k, v) =>
+      val req = taskResourceRequestFromJson(v)
+      (k, req)
+    }.toMap
+  }
+
+  def executorResourceRequestMapFromJson(json: JValue): Map[String, 
ExecutorResourceRequest] = {
+    val jsonFields = json.asInstanceOf[JObject].obj
+    jsonFields.map { case JField(k, v) =>
+      val req = executorResourceRequestFromJson(v)
+      (k, req)
+    }.toMap
+  }
+
   def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate 
= {
     // For compatible with previous event logs
     val hadoopProperties = jsonOption(json \ "Hadoop 
Properties").map(mapFromJson(_).toSeq)
@@ -804,9 +886,10 @@ private[spark] object JsonProtocol {
       }
     }
 
-    val stageInfo = new StageInfo(
-      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details,
-      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val rpId = jsonOption(json \ "Resource Profile Id").map(_.extract[Int])
+    val stageProf = rpId.getOrElse(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, 
rddInfos,
+      parentIds, details, resourceProfileId = stageProf)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason
@@ -1109,7 +1192,11 @@ private[spark] object JsonProtocol {
       case Some(resources) => resourcesMapFromJson(resources).toMap
       case None => Map.empty[String, ResourceInformation]
     }
-    new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources)
+    val resourceProfileId = jsonOption(json \ "Resource Profile Id") match {
+      case Some(id) => id.extract[Int]
+      case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+    }
+    new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources, 
resourceProfileId)
   }
 
   def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
diff --git 
a/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
index a646172..0b617a7 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
@@ -282,5 +282,6 @@
     [ 
"/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-jobclient-2.2.0.jar",
 "System Classpath" ],
     [ 
"/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sketch_2.11-2.1.0-SNAPSHOT.jar",
 "System Classpath" ],
     [ 
"/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-model-1.2.15.jar",
 "System Classpath" ]
-  ]
+  ],
+  "resourceProfiles" : [ ]
 }
diff --git 
a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 6e6d28b..d2b3d1b 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git 
a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
index b18b19f..0d197ea 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
@@ -717,5 +717,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git 
a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
index 8d11081..24d73fa 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
@@ -876,5 +876,6 @@
       "isBlacklistedForStage" : true
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git 
a/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
index a47cd26..a452488 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
@@ -41,7 +41,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 6, 5 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 1,
@@ -85,7 +86,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 1, 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 0,
@@ -129,5 +131,6 @@
   "schedulingPool" : "default",
   "rddIds" : [ 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 6e6d28b..d2b3d1b 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index eadf271..6742567 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -22,5 +22,6 @@
   "executorLogs" : { },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
index d322485..d052a27 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
@@ -50,7 +50,8 @@
     "MajorGCTime" : 144
   },
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "test-3.vpc.company.com:37641",
@@ -116,7 +117,8 @@
     "NM_HOST" : "test-3.vpc.company.com",
     "CONTAINER_ID" : "container_1553914137147_0018_01_000004"
   },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "test-4.vpc.company.com:33179",
@@ -182,7 +184,8 @@
     "NM_HOST" : "test-4.vpc.company.com",
     "CONTAINER_ID" : "container_1553914137147_0018_01_000003"
   },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "test-2.vpc.company.com:43764",
@@ -248,5 +251,6 @@
     "NM_HOST" : "test-2.vpc.company.com",
     "CONTAINER_ID" : "container_1553914137147_0018_01_000002"
   },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
index 7c3f77d..91574ca 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -28,7 +28,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -62,7 +63,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ,{
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -96,7 +98,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -130,7 +133,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -164,5 +168,6 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 0986e85..f14b9a5 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -28,7 +28,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -62,7 +63,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -96,7 +98,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -130,7 +133,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -164,5 +168,6 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
index 26d6651..3645387 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
@@ -22,7 +22,8 @@
   "executorLogs" : { },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.111:64543",
@@ -50,7 +51,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.111:64539",
@@ -78,7 +80,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.111:64541",
@@ -106,7 +109,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.111:64540",
@@ -134,5 +138,6 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
index e69ab3b..165389c 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
@@ -28,7 +28,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "tomg-test:46005",
@@ -77,7 +78,8 @@
       "name" : "gpu",
       "addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", 
"11", "12" ]
     }
-  }
+  },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "tomg-test:44873",
@@ -126,5 +128,6 @@
       "name" : "gpu",
       "addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", 
"11", "12" ]
     }
-  }
+  },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
index da26271..c387416 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
@@ -42,5 +42,6 @@
   "schedulingPool" : "default",
   "rddIds" : [ 3, 2 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 3102909..82489e9 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -28,19 +43,4 @@
     "endTimeEpoch" : 1554756046454,
     "lastUpdatedEpoch" : 0
   } ]
-}, {
-  "id" : "application_1516285256255_0012",
-  "name" : "Spark shell",
-  "attempts" : [ {
-    "startTime" : "2018-01-18T18:30:35.119GMT",
-    "endTime" : "2018-01-18T18:38:27.938GMT",
-    "lastUpdated" : "",
-    "duration" : 472819,
-    "sparkUser" : "attilapiros",
-    "completed" : true,
-    "appSparkVersion" : "2.3.0-SNAPSHOT",
-    "lastUpdatedEpoch" : 0,
-    "startTimeEpoch" : 1516300235119,
-    "endTimeEpoch" : 1516300707938
-  } ]
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 794f151..ac2bb0e 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -1,5 +1,19 @@
-[
-  {
+[ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
     "id": "application_1555004656427_0144",
     "name": "Spark shell",
     "attempts": [
diff --git 
a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index adcdcce..1561676 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git 
a/core/src/test/resources/HistoryServerExpectations/multiple_resource_profiles_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/multiple_resource_profiles_expectation.json
new file mode 100644
index 0000000..5c1e4cc
--- /dev/null
+++ 
b/core/src/test/resources/HistoryServerExpectations/multiple_resource_profiles_expectation.json
@@ -0,0 +1,112 @@
+{
+  "runtime" : {
+    "javaVersion" : "1.8.0_232 (Private Build)",
+    "javaHome" : "/usr/lib/jvm/java-8-openjdk-amd64/jre",
+    "scalaVersion" : "version 2.12.10"
+  },
+  "sparkProperties" : [ ],
+  "hadoopProperties" : [ ],
+  "systemProperties" : [ ],
+  "classpathEntries" : [ ],
+  "resourceProfiles" : [ {
+    "id" : 0,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 1,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "memory" : {
+        "resourceName" : "memory",
+        "amount" : 1024,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1,
+        "discoveryScript" : "/home/tgraves/getGpus",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 1.0
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1.0
+      }
+    }
+  }, {
+    "id" : 1,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 4,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1,
+        "discoveryScript" : "./getGpus",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 1.0
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1.0
+      }
+    }
+  }, {
+    "id" : 2,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 2,
+        "discoveryScript" : "",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 2.0
+      }
+    }
+  }, {
+    "id" : 3,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 4,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1,
+        "discoveryScript" : "./getGpus",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 2.0
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1.0
+      }
+    }
+  } ]
+}
diff --git 
a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
index 7919070..3db7d55 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
@@ -462,5 +462,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git 
a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
index 50d3f74..8ef3769 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
@@ -462,5 +462,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
index edbac71..a31c907 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
@@ -41,7 +41,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 6, 5 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "FAILED",
   "stageId" : 2,
@@ -86,7 +87,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 3, 2 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 1,
@@ -130,7 +132,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 1, 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 0,
@@ -174,5 +177,6 @@
   "schedulingPool" : "default",
   "rddIds" : [ 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
index 836f2cb..08089d4 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
@@ -45,5 +45,6 @@
     "name" : "my counter",
     "value" : "5050"
   } ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git 
a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
index 735a825..3b5476a 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
@@ -506,5 +506,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git 
a/core/src/test/resources/spark-events/application_1578436911597_0052 
b/core/src/test/resources/spark-events/application_1578436911597_0052
new file mode 100644
index 0000000..c57481a
--- /dev/null
+++ b/core/src/test/resources/spark-events/application_1578436911597_0052
@@ -0,0 +1,27 @@
+{"Event":"SparkListenerLogStart","Spark Version":"3.0.0-SNAPSHOT"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor 
Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery 
Script":"","Vendor":""},"memory":{"Resource 
Name":"memory","Amount":1024,"Discovery 
Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery 
Script":"/home/tgraves/getGpus","Vendor":""}},"Task Resource 
Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0},"gpu":{"Resource 
Name":"gpu","Amount":1.0}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor 
ID":"driver","Host":"10.10.10.10","Port":32957},"Maximum 
Memory":428762726,"Timestamp":1578764671818,"Maximum Onheap 
Memory":428762726,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java 
Home":"/usr/lib/jvm/java-8-openjdk-amd64/jre","Java Version":"1.8.0_232 
(Private Build)","Scala Version":"version 2.12.10"},"Spark 
Properties":{},"Hadoop Properties":{},"System Properties":{}, "Classpath 
Entries": {}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App 
ID":"application_1578436911597_0052","Timestamp":1578764662851,"User":"tgraves"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":1,"Executor 
Resource Requests":{"cores":{"Resource Name":"cores","Amount":4,"Discovery 
Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery 
Script":"./getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource 
Name":"cpus","Amount":1.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":2,"Executor 
Resource Requests":{"cores":{"Resource Name":"cores","Amount":2,"Discovery 
Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource 
Name":"cpus","Amount":2.0}}}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":3,"Executor 
Resource Requests":{"cores":{"Resource Name":"cores","Amount":4,"Discovery 
Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery 
Script":"./getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource 
Name":"cpus","Amount":2.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission 
Time":1578764765274,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage 
Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD 
ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map
 at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use 
Memory":false,"Deserialized":false,"Replication":1},"Number of 
Partitions":6,"Number of Cached Partitions":0,"Memory Size":0," [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage 
Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD 
Info":[{"RDD 
ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map
 at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use 
Memory":false,"Deserialized":false,"Replication":1},"Number of 
Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk 
Size":0},{"RDD ID":0,"Name":"Paral [...]
+{"Event":"SparkListenerExecutorAdded","Timestamp":1578764769706,"Executor 
ID":"1","Executor Info":{"Host":"host1","Total Cores":4,"Log 
Urls":{"stdout":"http://host1:8042/node/containerlogs/container_1578436911597_0052_01_000002/tgraves/stdout?start=-4096","stderr":"http://host1:8042/node/containerlogs/container_1578436911597_0052_01_000002/tgraves/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host1:8042","USER":"tgraves","LOG_FILES":"stderr,stdout","NM_HTTP_PORT":"8042","CLUSTER_
 [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor 
ID":"1","Host":"host1","Port":40787},"Maximum 
Memory":384093388,"Timestamp":1578764769796,"Maximum Onheap 
Memory":384093388,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task 
Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1578764769858,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task 
Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1578764769877,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task 
Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1578764770507,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task 
Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1578764770509,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task 
Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":0,"Index":0,"Attempt":0,"Launch Time":1578764769858,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish 
Time":1578764770512,"Failed":false,"Killed":false,"Accumulables":[{"ID":6,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":2,"Internal":true,"Count
 Failed  [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task 
Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":1,"Index":1,"Attempt":0,"Launch Time":1578764769877,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish 
Time":1578764770515,"Failed":false,"Killed":false,"Accumulables":[{"ID":6,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":4,"Internal":true,"Count
 Failed  [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task 
Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1578764770525,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task 
Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":2,"Index":2,"Attempt":0,"Launch Time":1578764770507,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish 
Time":1578764770526,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":11064,"Internal":true,"Count
 Failed Values [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task 
Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1578764770527,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task 
Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":3,"Index":3,"Attempt":0,"Launch Time":1578764770509,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish 
Time":1578764770529,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3620,"Value":14684,"Internal":true,"Count
 Failed Values [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task 
Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":4,"Index":4,"Attempt":0,"Launch Time":1578764770525,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish 
Time":1578764770542,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":18320,"Internal":true,"Count
 Failed Values [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task 
Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task 
ID":5,"Index":5,"Attempt":0,"Launch Time":1578764770527,"Executor 
ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting 
Result Time":0,"Finish 
Time":1578764770542,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":21956,"Internal":true,"Count
 Failed Values [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage 
Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD 
Info":[{"RDD 
ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map
 at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use 
Memory":false,"Deserialized":false,"Replication":1},"Number of 
Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk 
Size":0},{"RDD ID":0,"Name":"Paral [...]
+{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1578764770546,"Job 
Result":{"Result":"JobSucceeded"}}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1578764802615}
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 807f0eb..8037f4a 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1442,7 +1442,7 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
       conf: SparkConf,
       clock: Clock = new SystemClock()): ExecutorAllocationManager = {
     ResourceProfile.reInitDefaultProfile(conf)
-    rpManager = new ResourceProfileManager(conf)
+    rpManager = new ResourceProfileManager(conf, listenerBus)
     val manager = new ExecutorAllocationManager(client, listenerBus, conf, 
clock = clock,
       resourceProfileManager = rpManager)
     managers += manager
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index c55b29b..8c2dfff 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -171,6 +171,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
     "executor node blacklisting unblacklisting" -> 
"applications/app-20161115172038-0000/executors",
     "executor memory usage" -> 
"applications/app-20161116163331-0000/executors",
     "executor resource information" -> 
"applications/application_1555004656427_0144/executors",
+    "multiple resource profiles" -> 
"applications/application_1578436911597_0052/environment",
 
     "app environment" -> "applications/app-20161116163331-0000/environment",
 
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index 004618a..f452173 100644
--- 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.resource
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests._
+import org.apache.spark.scheduler.LiveListenerBus
 
 class ResourceProfileManagerSuite extends SparkFunSuite {
 
@@ -39,9 +40,11 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     }
   }
 
+  val listenerBus = new LiveListenerBus(new SparkConf())
+
   test("ResourceProfileManager") {
     val conf = new SparkConf().set(EXECUTOR_CORES, 4)
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     val defaultProf = rpmanager.defaultResourceProfile
     assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assert(defaultProf.executorResources.size === 2,
@@ -53,7 +56,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
   test("isSupported yarn no dynamic allocation") {
     val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     // default profile should always work
     val defaultProf = rpmanager.defaultResourceProfile
     val rprof = new ResourceProfileBuilder()
@@ -71,7 +74,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
     conf.set(DYN_ALLOCATION_ENABLED, true)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     // default profile should always work
     val defaultProf = rpmanager.defaultResourceProfile
     val rprof = new ResourceProfileBuilder()
@@ -84,7 +87,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
   test("isSupported yarn with local mode") {
     val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     // default profile should always work
     val defaultProf = rpmanager.defaultResourceProfile
     val rprof = new ResourceProfileBuilder()
@@ -100,7 +103,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
 
   test("ResourceProfileManager has equivalent profile") {
     val conf = new SparkConf().set(EXECUTOR_CORES, 4)
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     var rpAlreadyExist: Option[ResourceProfile] = None
     val checkId = 500
     for (i <- 1 to 1000) {
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 7711934..5d34a56 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -92,7 +92,8 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
         accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, 
"value")),
         tasks = None,
         executorSummary = None,
-        killedTasksSummary = Map.empty
+        killedTasksSummary = Map.empty,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
       )
       val taskTable = new TaskPagedTable(
         stageData,
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index eb7f307..bc7f8b5 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile, 
ResourceUtils}
+import org.apache.spark.resource._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.shuffle.MetadataFetchFailedException
@@ -92,7 +92,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
     val applicationEnd = SparkListenerApplicationEnd(42L)
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
-      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, 
resources.toMap))
+      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, 
resources.toMap, 4))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, 
"exec2", "test reason")
     val executorBlacklisted = 
SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
     val executorUnblacklisted =
@@ -119,6 +119,14 @@ class JsonProtocolSuite extends SparkFunSuite {
       SparkListenerStageExecutorMetrics("1", 2, 3,
         new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 
432L,
           321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L, 
10L, 90L, 2L, 20L)))
+    val rprofBuilder = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+    val execReq =
+      new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
+    rprofBuilder.require(taskReq).require(execReq)
+    val resourceProfile = rprofBuilder.build
+    resourceProfile.setResourceProfileId(21)
+    val resourceProfileAdded = 
SparkListenerResourceProfileAdded(resourceProfile)
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
     testEvent(taskStart, taskStartJsonString)
@@ -144,6 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
     testEvent(blockUpdated, blockUpdatedJsonString)
     testEvent(stageExecutorMetrics, stageExecutorMetricsJsonString)
+    testEvent(resourceProfileAdded, resourceProfileJsonString)
   }
 
   test("Dependent Classes") {
@@ -231,6 +240,20 @@ class JsonProtocolSuite extends SparkFunSuite {
     assert(0 === newInfo.accumulables.size)
   }
 
+  test("StageInfo resourceProfileId") {
+    val info = makeStageInfo(1, 2, 3, 4L, 5L, 5)
+    val json = JsonProtocol.stageInfoToJson(info)
+
+    // Fields added after 1.0.0.
+    assert(info.details.nonEmpty)
+    assert(info.resourceProfileId === 5)
+
+    val newInfo = JsonProtocol.stageInfoFromJson(json)
+
+    assert(info.name === newInfo.name)
+    assert(5 === newInfo.resourceProfileId)
+  }
+
   test("InputMetrics backward compatibility") {
     // InputMetrics were added after 1.0.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, 
hasOutput = false)
@@ -865,6 +888,10 @@ private[spark] object JsonProtocolSuite extends Assertions 
{
     assert(ste1.getFileName === ste2.getFileName)
   }
 
+  private def assertEquals(rp1: ResourceProfile, rp2: ResourceProfile): Unit = 
{
+    assert(rp1 === rp2)
+  }
+
   /** ----------------------------------- *
    | Util methods for constructing events |
    * ------------------------------------ */
@@ -895,10 +922,16 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
     r
   }
 
-  private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
+  private def makeStageInfo(
+      a: Int,
+      b: Int,
+      c: Int,
+      d: Long,
+      e: Long,
+      rpId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) = {
     val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, 
d + i, e + i) }
     val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 
200, 300), "details",
-      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      resourceProfileId = rpId)
     val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
     stageInfo.accumulables(acc1.id) = acc1
     stageInfo.accumulables(acc2.id) = acc2
@@ -1034,7 +1067,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |        "Internal": false,
       |        "Count Failed Values": false
       |      }
-      |    ]
+      |    ],
+      |    "Resource Profile Id" : 0
       |  },
       |  "Properties": {
       |    "France": "Paris",
@@ -1091,7 +1125,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |        "Internal": false,
       |        "Count Failed Values": false
       |      }
-      |    ]
+      |    ],
+      |    "Resource Profile Id" : 0
       |  }
       |}
     """.stripMargin
@@ -1613,7 +1648,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    },
       |    {
       |      "Stage ID": 2,
@@ -1673,7 +1709,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    },
       |    {
       |      "Stage ID": 3,
@@ -1749,7 +1786,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    },
       |    {
       |      "Stage ID": 4,
@@ -1841,7 +1879,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    }
       |  ],
       |  "Stage IDs": [
@@ -1988,7 +2027,8 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |        "name" : "gpu",
       |        "addresses" : [ "0", "1" ]
       |      }
-      |    }
+      |    },
+      |    "Resource Profile Id": 4
       |  }
       |}
     """.stripMargin
@@ -2334,6 +2374,38 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  "hostId" : "node1"
       |}
     """.stripMargin
+  private val resourceProfileJsonString =
+    """
+      |{
+      |  "Event":"SparkListenerResourceProfileAdded",
+      |  "Resource Profile Id":21,
+      |  "Executor Resource Requests":{
+      |    "cores" : {
+      |      "Resource Name":"cores",
+      |      "Amount":2,
+      |      "Discovery Script":"",
+      |      "Vendor":""
+      |    },
+      |    "gpu":{
+      |      "Resource Name":"gpu",
+      |      "Amount":2,
+      |      "Discovery Script":"myscript",
+      |      "Vendor":""
+      |    }
+      |  },
+      |  "Task Resource Requests":{
+      |    "cpus":{
+      |      "Resource Name":"cpus",
+      |      "Amount":1.0
+      |    },
+      |    "gpu":{
+      |      "Resource Name":"gpu",
+      |      "Amount":1.0
+      |    }
+      |  }
+      |}
+    """.stripMargin
+
 }
 
 case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index f997c9b..4faaaec 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -124,3 +124,4 @@ vote.tmpl
 SessionManager.java
 SessionHandler.java
 GangliaReporter.java
+application_1578436911597_0052
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 8c683e8..894e1e4 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.resource.ResourceProfileManager
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, 
TaskSchedulerImpl}
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import 
org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
@@ -87,7 +87,8 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
   private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
 
-  private val resourceProfileManager = new ResourceProfileManager(sparkConf)
+  private val listenerBus = new LiveListenerBus(new SparkConf())
+  private val resourceProfileManager = new ResourceProfileManager(sparkConf, 
listenerBus)
 
   before {
     MockitoAnnotations.initMocks(this)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to