This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 148918b5469 [SPARK-36664][CORE] Log time waiting for cluster resources
148918b5469 is described below

commit 148918b5469eb972b5d8e22d9b1cea5ca0721045
Author: Holden Karau <hka...@netflix.com>
AuthorDate: Mon Apr 25 20:42:37 2022 -0700

    [SPARK-36664][CORE] Log time waiting for cluster resources
    
    ### What changes were proposed in this pull request?
    
    Keep track of and communicate to the listener bus how long we are waiting 
for execs to be allocated from the underlying cluster manager.
    
    Replaces previous PR with GHA issues (  #35172 ) and WIP PR 
https://github.com/apache/spark/pull/34650  and #35881
    
    ### What changes were proposed in this pull request?
    
    Keep track of and communicate to the listener bus how long we are waiting 
for execs to be allocated from the underlying cluster manager.
    
    Replaces previous PR with GHA issues (  #35172 ) and WIP PR 
https://github.com/apache/spark/pull/34650
    
    ### Why are the changes needed?
    
    Sometimes the cluster manager may choke or otherwise not be able to 
allocate resources and we don't have a good way of detecting this situation 
making it difficult for the user to debug and tell apart from Spark not scaling 
up correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New field in the listener bus message for when a executor is allocated.
    
    ### How was this patch tested?
    
    New unit test in the listener suite.
    ### Why are the changes needed?
    
    Sometimes the cluster manager may choke or otherwise not be able to 
allocate resources and we don't have a good way of detecting this situation 
making it difficult for the user to debug and tell apart from Spark not scaling 
up correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New field in the listener bus message for when a executor is allocated.
    
    ### How was this patch tested?
    
    New unit test in the listener suite.
    
    Closes #36185 from 
holdenk/SPARK-36664-Log-time-waiting-for-cluster-resources-r4.
    
    Lead-authored-by: Holden Karau <hka...@netflix.com>
    Co-authored-by: Holden Karau <hol...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hka...@netflix.com>
---
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  67 ++++++++++-
 .../spark/scheduler/cluster/ExecutorData.scala     |   6 +-
 .../spark/scheduler/cluster/ExecutorInfo.scala     |  20 +++-
 .../scala/org/apache/spark/util/JsonProtocol.scala |  13 +-
 .../CoarseGrainedSchedulerBackendSuite.scala       | 131 +++++++++++++++++++++
 .../scheduler/SparkListenerWithClusterSuite.scala  |   1 +
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  |   2 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  |  38 ++++++
 8 files changed, 266 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 13a7183a29d..61d67765c8c 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{ScheduledExecutorService, 
TimeUnit}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.{HashMap, HashSet, Queue}
 import scala.concurrent.Future
 
 import org.apache.hadoop.security.UserGroupInformation
@@ -82,6 +82,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
   private val requestedTotalExecutorsPerResourceProfile = new 
HashMap[ResourceProfile, Int]
 
+  // Profile IDs to the times that executors were requested for.
+  // The operations we do on queue are all amortized constant cost
+  // see 
https://www.scala-lang.org/api/2.13.x/scala/collection/mutable/ArrayDeque.html
+  @GuardedBy("CoarseGrainedSchedulerBackend.this")
+  private val execRequestTimes = new HashMap[Int, Queue[(Int, Long)]]
+
   private val listenerBus = scheduler.sc.listenerBus
 
   // Executors we have requested the cluster manager to kill that have not 
died yet; maps
@@ -260,9 +266,27 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
               
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
             (info.name, new ExecutorResourceInfo(info.name, info.addresses, 
numParts))
           }
+          // If we've requested the executor figure out when we did.
+          val reqTs: Option[Long] = 
CoarseGrainedSchedulerBackend.this.synchronized {
+            execRequestTimes.get(resourceProfileId).flatMap {
+              times =>
+              times.headOption.map {
+                h =>
+                // Take off the top element
+                times.dequeue()
+                // If we requested more than one exec reduce the req count by 
1 and prepend it back
+                if (h._1 > 1) {
+                  ((h._1 - 1, h._2)) +=: times
+                }
+                h._2
+              }
+            }
+          }
+
           val data = new ExecutorData(executorRef, executorAddress, hostname,
             0, cores, logUrlHandler.applyPattern(logUrls, attributes), 
attributes,
-            resourcesInfo, resourceProfileId, registrationTs = 
System.currentTimeMillis())
+            resourcesInfo, resourceProfileId, registrationTs = 
System.currentTimeMillis(),
+            requestTs = reqTs)
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
@@ -742,6 +766,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       val numExisting = 
requestedTotalExecutorsPerResourceProfile.getOrElse(defaultProf, 0)
       requestedTotalExecutorsPerResourceProfile(defaultProf) = numExisting + 
numAdditionalExecutors
       // Account for executors pending to be added or removed
+      updateExecRequestTime(defaultProf.id, numAdditionalExecutors)
       doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
     }
 
@@ -780,15 +805,53 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       (scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
     }
     val response = synchronized {
+      val oldResourceProfileToNumExecutors = 
requestedTotalExecutorsPerResourceProfile.map {
+        case (rp, num) =>
+          (rp.id, num)
+      }.toMap
       this.requestedTotalExecutorsPerResourceProfile.clear()
       this.requestedTotalExecutorsPerResourceProfile ++= 
resourceProfileToNumExecutors
       this.numLocalityAwareTasksPerResourceProfileId = 
numLocalityAwareTasksPerResourceProfileId
       this.rpHostToLocalTaskCount = hostToLocalTaskCount
+      updateExecRequestTimes(oldResourceProfileToNumExecutors, 
resourceProfileIdToNumExecutors)
       doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
     }
     defaultAskTimeout.awaitResult(response)
   }
 
+  private def updateExecRequestTimes(oldProfile: Map[Int, Int], newProfile: 
Map[Int, Int]): Unit = {
+    newProfile.map {
+      case (k, v) =>
+        val delta = v - oldProfile.getOrElse(k, 0)
+        if (delta != 0) {
+          updateExecRequestTime(k, delta)
+        }
+    }
+  }
+
+  private def updateExecRequestTime(profileId: Int, delta: Int) = {
+    val times = execRequestTimes.getOrElseUpdate(profileId, Queue[(Int, 
Long)]())
+    if (delta > 0) {
+      // Add the request to the end, constant time op
+      times += ((delta, System.currentTimeMillis()))
+    } else if (delta < 0) {
+      // Consume as if |delta| had been allocated
+      var toConsume = -delta
+      // Note: it's possible that something else allocated an executor and we 
have
+      // a negative delta, we can just avoid mutating the queue.
+      while (toConsume > 0 && times.nonEmpty) {
+        val h = times.dequeue
+        if (h._1 > toConsume) {
+          // Prepend updated first req to times, constant time op
+          ((h._1 - toConsume, h._2)) +=: times
+          toConsume = 0
+        } else {
+          toConsume = toConsume - h._1
+        }
+      }
+    }
+  }
+
   /**
    * Request executors from the cluster manager by specifying the total number 
desired,
    * including existing pending and running executors.
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index 86b44e83536..07236d4007f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
  * @param resourcesInfo The information of the currently available resources 
on the executor
  * @param resourceProfileId The id of the ResourceProfile being used by this 
executor
  * @param registrationTs The registration timestamp of this executor
+ * @param requestTs What time this executor was most likely requested at
  */
 private[cluster] class ExecutorData(
     val executorEndpoint: RpcEndpointRef,
@@ -42,6 +43,7 @@ private[cluster] class ExecutorData(
     override val attributes: Map[String, String],
     override val resourcesInfo: Map[String, ExecutorResourceInfo],
     override val resourceProfileId: Int,
-    val registrationTs: Long
+    val registrationTs: Long,
+    val requestTs: Option[Long]
 ) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
-  resourcesInfo, resourceProfileId)
+  resourcesInfo, resourceProfileId, Some(registrationTs), requestTs)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index a97b08941ba..5be8950192c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -31,10 +31,19 @@ class ExecutorInfo(
     val logUrlMap: Map[String, String],
     val attributes: Map[String, String],
     val resourcesInfo: Map[String, ResourceInformation],
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    val registrationTime: Option[Long],
+    val requestTime: Option[Long]) {
 
+  def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, 
String],
+      attributes: Map[String, String], resourcesInfo: Map[String, 
ResourceInformation],
+      resourceProfileId: Int) = {
+    this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo, 
resourceProfileId,
+      None, None)
+  }
   def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, 
String]) = {
-    this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, 
DEFAULT_RESOURCE_PROFILE_ID)
+    this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, 
DEFAULT_RESOURCE_PROFILE_ID,
+      None, None)
   }
 
   def this(
@@ -42,7 +51,8 @@ class ExecutorInfo(
       totalCores: Int,
       logUrlMap: Map[String, String],
       attributes: Map[String, String]) = {
-    this(executorHost, totalCores, logUrlMap, attributes, Map.empty, 
DEFAULT_RESOURCE_PROFILE_ID)
+    this(executorHost, totalCores, logUrlMap, attributes, Map.empty, 
DEFAULT_RESOURCE_PROFILE_ID,
+      None, None)
   }
 
   def this(
@@ -52,7 +62,7 @@ class ExecutorInfo(
       attributes: Map[String, String],
       resourcesInfo: Map[String, ResourceInformation]) = {
     this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
-      DEFAULT_RESOURCE_PROFILE_ID)
+      DEFAULT_RESOURCE_PROFILE_ID, None, None)
   }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@@ -72,6 +82,6 @@ class ExecutorInfo(
   override def hashCode(): Int = {
     val state = Seq(executorHost, totalCores, logUrlMap, attributes, 
resourcesInfo,
       resourceProfileId)
-    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+    state.filter(_ != null).map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index acbd3239df2..09f1be2076c 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -527,7 +527,9 @@ private[spark] object JsonProtocol {
     ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
     ("Attributes" -> mapToJson(executorInfo.attributes)) ~
     ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
-    ("Resource Profile Id" -> executorInfo.resourceProfileId)
+    ("Resource Profile Id" -> executorInfo.resourceProfileId) ~
+    ("Registration Time" -> executorInfo.registrationTime) ~
+    ("Request Time" -> executorInfo.requestTime)
   }
 
   def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
@@ -1223,8 +1225,15 @@ private[spark] object JsonProtocol {
       case Some(id) => id.extract[Int]
       case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
     }
+    val registrationTs = jsonOption(json \ "Registration Time") map { ts =>
+      ts.extract[Long]
+    }
+    val requestTs = jsonOption(json \ "Request Time") map { ts =>
+      ts.extract[Long]
+    }
+
     new ExecutorInfo(executorHost, totalCores, logUrls, attributes.toMap, 
resources.toMap,
-      resourceProfileId)
+      resourceProfileId, registrationTs, requestTs)
   }
 
   def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 4663717dc86..0acc2dea2b9 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -40,6 +40,7 @@ import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.{RpcUtils, SerializableBuffer, Utils}
 
 class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext
@@ -189,6 +190,8 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 
   test("extra resources from executor") {
 
+    val testStartTime = System.currentTimeMillis()
+
     val execCores = 3
     val conf = new SparkConf()
       .set(EXECUTOR_CORES, execCores)
@@ -207,6 +210,10 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     sc.resourceProfileManager.addResourceProfile(rp)
     assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val backend = 
sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
+    // Note we get two in default profile and one in the new rp
+    // we need to put a req time in for all of them.
+    backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
+    backend.requestExecutors(3)
     val mockEndpointRef = mock[RpcEndpointRef]
     val mockAddress = mock[RpcAddress]
     when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => 
{})
@@ -214,8 +221,12 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", 
"3")))
 
     var executorAddedCount: Int = 0
+    val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
     val listener = new SparkListener() {
       override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
+        // Lets check that the exec allocation times "make sense"
+        val info = executorAdded.executorInfo
+        infos += info
         executorAddedCount += 1
       }
     }
@@ -271,8 +282,128 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     }
     sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
     assert(executorAddedCount === 3)
+    infos.foreach { info =>
+      assert(info.requestTime.get > 0,
+        "Exec allocation and request times don't make sense")
+      assert(info.requestTime.get > testStartTime,
+        "Exec allocation and request times don't make sense")
+      assert(info.registrationTime.get > info.requestTime.get,
+        "Exec allocation and request times don't make sense")
+    }
   }
 
+  test("exec alloc decrease.") {
+
+    val testStartTime = System.currentTimeMillis()
+
+    val execCores = 3
+    val conf = new SparkConf()
+      .set(EXECUTOR_CORES, execCores)
+      .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive 
during test
+      .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor 
registrations
+      .setMaster(
+      
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
+      .setAppName("test")
+    conf.set(TASK_GPU_ID.amountConf, "1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "1")
+
+    sc = new SparkContext(conf)
+    val execGpu = new ExecutorResourceRequests().cores(1).resource(GPU, 3)
+    val taskGpu = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+    val rp = new ResourceProfile(execGpu.requests, taskGpu.requests)
+    sc.resourceProfileManager.addResourceProfile(rp)
+    assert(rp.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val backend = 
sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
+    // Note we get two in default profile and one in the new rp
+    // we need to put a req time in for all of them.
+    backend.requestTotalExecutors(Map((rp.id, 1)), Map(), Map())
+    // Decrease the number of execs requested in the new rp.
+    backend.requestTotalExecutors(Map((rp.id, 0)), Map(), Map())
+    // Request execs in the default profile.
+    backend.requestExecutors(3)
+    val mockEndpointRef = mock[RpcEndpointRef]
+    val mockAddress = mock[RpcAddress]
+    when(mockEndpointRef.send(LaunchTask)).thenAnswer((_: InvocationOnMock) => 
{})
+
+    val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", 
"3")))
+
+    var executorAddedCount: Int = 0
+    val infos = scala.collection.mutable.ArrayBuffer[ExecutorInfo]()
+    val listener = new SparkListener() {
+      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
+        // Lets check that the exec allocation times "make sense"
+        val info = executorAdded.executorInfo
+        infos += info
+        executorAddedCount += 1
+      }
+    }
+
+    sc.addSparkListener(listener)
+
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, 
Map.empty, resources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, 
Map.empty, resources,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, 
Map.empty, resources,
+        rp.id))
+
+    val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
+    val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
+    val buffer = new SerializableBuffer(bytebuffer)
+
+    var execResources = backend.getExecutorAvailableResources("1")
+    assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
+
+    val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
+    assert(exec3ResourceProfileId === rp.id)
+
+    val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
+    val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 
0, "1",
+      "t1", 0, 1, mutable.Map.empty[String, Long],
+      mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
+      new Properties(), 1, taskResources, bytebuffer)))
+    val ts = backend.getTaskSchedulerImpl()
+    when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], 
any[Boolean])).thenReturn(taskDescs)
+
+    backend.driverEndpoint.send(ReviveOffers)
+
+    eventually(timeout(5 seconds)) {
+      execResources = backend.getExecutorAvailableResources("1")
+      assert(execResources(GPU).availableAddrs.sorted === Array("1", "3"))
+      assert(execResources(GPU).assignedAddrs === Array("0"))
+    }
+
+    // To avoid allocating any resources immediately after releasing the 
resource from the task to
+    // make sure that `availableAddrs` below won't change
+    when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], 
any[Boolean])).thenReturn(Seq.empty)
+    backend.driverEndpoint.send(
+      StatusUpdate("1", 1, TaskState.FINISHED, buffer, taskResources))
+
+    eventually(timeout(5 seconds)) {
+      execResources = backend.getExecutorAvailableResources("1")
+      assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
+      assert(execResources(GPU).assignedAddrs.isEmpty)
+    }
+    sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
+    assert(executorAddedCount === 3)
+    infos.foreach { info =>
+      info.requestTime.map { t =>
+        assert(t > 0,
+          "Exec request times don't make sense")
+        assert(t >= testStartTime,
+          "Exec allocation and request times don't make sense")
+        assert(t <= info.registrationTime.get,
+          "Exec allocation and request times don't make sense")
+      }
+    }
+    assert(infos.filter(_.requestTime.isEmpty).length === 1,
+      "Our unexpected executor does not have a request time.")
+  }
+
+
   private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
     sc.submitJob(
       rdd,
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index c84735c9665..8b81468406b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -52,6 +52,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite 
with LocalSparkContext
     assert(listener.addedExecutorInfo.size == 2)
     assert(listener.addedExecutorInfo("0").totalCores == 1)
     assert(listener.addedExecutorInfo("1").totalCores == 1)
+    assert(listener.addedExecutorInfo("0").registrationTime.get > 0 )
   }
 
   private class SaveExecutorInfo extends SparkListener {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 39e1470d120..da8c97e54d1 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -285,7 +285,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     knownExecs ++= Set("1", "2", "3")
 
     val execInfoRp1 = new ExecutorInfo("host1", 1, Map.empty,
-      Map.empty, Map.empty, 1)
+      Map.empty, Map.empty, 1, None, None)
 
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", execInfo))
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"2", execInfo))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 36b61f67e3b..5800dbda297 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -96,6 +96,9 @@ class JsonProtocolSuite extends SparkFunSuite {
     val applicationEnd = SparkListenerApplicationEnd(42L)
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
       new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, 
resources.toMap, 4))
+    val executorAddedWithTime = SparkListenerExecutorAdded(executorAddedTime, 
"exec1",
+      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, 
resources.toMap, 4,
+        Some(1), Some(0)))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, 
"exec2", "test reason")
     val executorBlacklisted = 
SparkListenerExecutorBlacklisted(executorExcludedTime, "exec1", 22)
     val executorUnblacklisted =
@@ -155,6 +158,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(applicationStartWithLogs, applicationStartJsonWithLogUrlsString)
     testEvent(applicationEnd, applicationEndJsonString)
     testEvent(executorAdded, executorAddedJsonString)
+    testEvent(executorAddedWithTime, executorAddedWithTimeJsonString)
     testEvent(executorRemoved, executorRemovedJsonString)
     testEvent(executorBlacklisted, executorBlacklistedJsonString)
     testEvent(executorUnblacklisted, executorUnblacklistedJsonString)
@@ -173,6 +177,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("Dependent Classes") {
     val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
     val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap
+    val rinfo = Map[String, ResourceInformation]().toMap
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L, DeterministicLevel.DETERMINATE))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 888, 777L, false))
@@ -180,6 +185,8 @@ class JsonProtocolSuite extends SparkFunSuite {
       33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput 
= false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
     testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, attributes))
+    testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, attributes,
+      rinfo, 1, Some(1), Some(0)))
 
     // StorageLevel
     testStorageLevel(StorageLevel.NONE)
@@ -2146,6 +2153,37 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |}
     """.stripMargin
 
+  private val executorAddedWithTimeJsonString =
+    s"""
+      |{
+      |  "Event": "SparkListenerExecutorAdded",
+      |  "Timestamp": ${executorAddedTime},
+      |  "Executor ID": "exec1",
+      |  "Executor Info": {
+      |    "Host": "Hostee.awesome.com",
+      |    "Total Cores": 11,
+      |    "Log Urls" : {
+      |      "stderr" : "mystderr",
+      |      "stdout" : "mystdout"
+      |    },
+      |    "Attributes" : {
+      |      "ContainerId" : "ct1",
+      |      "User" : "spark"
+      |    },
+      |    "Resources" : {
+      |      "gpu" : {
+      |        "name" : "gpu",
+      |        "addresses" : [ "0", "1" ]
+      |      }
+      |    },
+      |    "Resource Profile Id": 4,
+      |    "Registration Time" : 1,
+      |    "Request Time" : 0
+      |  }
+      |
+      |}
+    """.stripMargin
+
   private val executorRemovedJsonString =
     s"""
       |{


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to