Github user vanzin commented on a diff in the pull request:
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -0,0 +1,690 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.status
    +import java.util.{Date, Properties}
    +import scala.collection.JavaConverters._
    +import scala.reflect.{classTag, ClassTag}
    +import org.scalatest.BeforeAndAfter
    +import org.apache.spark._
    +import org.apache.spark.executor.TaskMetrics
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster._
    +import org.apache.spark.status.api.v1
    +import org.apache.spark.util.Utils
    +import org.apache.spark.util.kvstore._
    +class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
    +  private var time: Long = _
    +  private var testDir: File = _
    +  private var store: KVStore = _
    +  before {
    +    time = 0L
    +    testDir = Utils.createTempDir()
    +    store =, getClass().getName())
    +  }
    +  after {
    +    store.close()
    +    Utils.deleteRecursively(testDir)
    +  }
    +  test("scheduler events") {
    +    val listener = new AppStatusListener(store)
    +    // Start the application.
    +    time += 1
    +    listener.onApplicationStart(SparkListenerApplicationStart(
    +      "name",
    +      Some("id"),
    +      time,
    +      "user",
    +      Some("attempt"),
    +      None))
    +    check[ApplicationInfoWrapper]("id") { app =>
    +      assert( === "name")
    +      assert( === "id")
    +      assert( === 1)
    +      val attempt =
    +      assert(attempt.attemptId === Some("attempt"))
    +      assert(attempt.startTime === new Date(time))
    +      assert(attempt.lastUpdated === new Date(time))
    +      assert(attempt.endTime.getTime() === -1L)
    +      assert(attempt.sparkUser === "user")
    +      assert(!attempt.completed)
    +    }
    +    // Start a couple of executors.
    +    time += 1
    +    val execIds = Array("1", "2")
    +    execIds.foreach { id =>
    +      listener.onExecutorAdded(SparkListenerExecutorAdded(time, id,
    +        new ExecutorInfo(s"$", 1, Map())))
    +    }
    +    execIds.foreach { id =>
    +      check[ExecutorSummaryWrapper](id) { exec =>
    +        assert( === id)
    +        assert( === s"$")
    +        assert(
    +      }
    +    }
    +    // Start a job with 2 stages / 4 tasks each
    +    time += 1
    +    val stages = Seq(
    +      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
    +      new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2"))
    +    val jobProps = new Properties()
    +    jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup")
    +    jobProps.setProperty("spark.scheduler.pool", "schedPool")
    +    listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps))
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1)
    +      assert( ===
    +      assert( === None)
    +      assert( === JobExecutionStatus.RUNNING)
    +      assert( === Some(new Date(time)))
    +      assert( === Some("jobGroup"))
    +    }
    +    stages.foreach { info =>
    +      check[StageDataWrapper](key(info)) { stage =>
    +        assert( === v1.StageStatus.PENDING)
    +        assert( === "schedPool")
    +        assert(stage.jobIds === Set(1))
    +      }
    +    }
    +    // Submit stage 1
    +    time += 1
    +    stages.head.submissionTime = Some(time)
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, 
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1)
    +    }
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert( === v1.StageStatus.ACTIVE)
    +      assert( === Some(new 
    +    }
    +    // Start tasks from stage 1
    +    time += 1
    +    var _taskIdTracker = -1L
    +    def nextTaskId(): Long = {
    +      _taskIdTracker += 1
    +      _taskIdTracker
    +    }
    +    def createTasks(count: Int, time: Long): Seq[TaskInfo] = {
    +      (1 to count).map { id =>
    +        val exec = execIds(id.toInt % execIds.length)
    +        val taskId = nextTaskId()
    +        new TaskInfo(taskId, taskId.toInt, 1, time, exec, 
    +          TaskLocality.PROCESS_LOCAL, id % 2 == 0)
    +      }
    +    }
    +    val s1Tasks = createTasks(4, time)
    +    s1Tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptId, task))
    +    }
    +    assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
    +    check[JobDataWrapper](1) { job =>
    +      assert( === s1Tasks.size)
    +    }
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert( === s1Tasks.size)
    +      assert( === Some(new 
    +    }
    +    s1Tasks.foreach { task =>
    +      check[TaskDataWrapper](task.taskId) { wrapper =>
    +        assert( === task.taskId)
    +        assert( === task.index)
    +        assert( === task.attemptNumber)
    +        assert( === new Date(task.launchTime))
    +        assert( === task.executorId)
    +        assert( ===
    +        assert( === task.status)
    +        assert( === task.taskLocality.toString())
    +        assert( === task.speculative)
    +      }
    +    }
    +    // Send executor metrics update. Only update one metric to avoid a lot 
of boilerplate code.
    +    s1Tasks.foreach { task =>
    +      val accum = new AccumulableInfo(1L, 
    +        Some(1L), None, true, false, None)
    +      listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
    +        task.executorId,
    +        Seq((task.taskId, stages.head.stageId, stages.head.attemptId, 
    +    }
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert( === s1Tasks.size)
    +      val execs = 
    +        .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
    +      assert(execs.size > 0)
    +      execs.foreach { exec =>
    +        assert( === s1Tasks.size / 2)
    +      }
    +    }
    +    // Fail one of the tasks, re-start it.
    +    time += 1
    +    s1Tasks.head.markFinished(TaskState.FAILED, time)
    +    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
    +      "taskType", TaskResultLost, s1Tasks.head, null))
    +    time += 1
    +    val reattempt = {
    +      val orig = s1Tasks.head
    +      // Task reattempts have a different ID, but the same index as the 
    +      new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, 
    +        s"${orig.executorId}", TaskLocality.PROCESS_LOCAL, 
    +    }
    +    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
    +      reattempt))
    +    assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1)
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1)
    +      assert( === s1Tasks.size)
    +    }
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert( === 1)
    +      assert( === s1Tasks.size)
    +    }
    +    check[TaskDataWrapper](s1Tasks.head.taskId) { task =>
    +      assert( === s1Tasks.head.status)
    +      assert( === Some(s1Tasks.head.duration))
    +      assert( == Some(TaskResultLost.toErrorString))
    +    }
    +    check[TaskDataWrapper](reattempt.taskId) { task =>
    +      assert( === s1Tasks.head.index)
    +      assert( === reattempt.attemptNumber)
    +    }
    +    // Succeed all tasks in stage 1.
    +    val pending = s1Tasks.drop(1) ++ Seq(reattempt)
    +    val s1Metrics = TaskMetrics.empty
    +    s1Metrics.setExecutorCpuTime(2L)
    +    s1Metrics.setExecutorRunTime(4L)
    +    time += 1
    +    pending.foreach { task =>
    +      task.markFinished(TaskState.FINISHED, time)
    +      listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, 
    +        "taskType", Success, task, s1Metrics))
    +    }
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1)
    +      assert( === 0)
    +      assert( === pending.size)
    +    }
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert( === 1)
    +      assert( === 0)
    +      assert( === pending.size)
    +    }
    +    pending.foreach { task =>
    +      check[TaskDataWrapper](task.taskId) { wrapper =>
    +        assert( === None)
    +        assert( === 2L)
    +        assert( === 4L)
    +      }
    +    }
    +    assert(store.count(classOf[TaskDataWrapper]) === pending.size + 1)
    +    // End stage 1.
    +    time += 1
    +    stages.head.completionTime = Some(time)
    +    listener.onStageCompleted(SparkListenerStageCompleted(stages.head))
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 0)
    +      assert( === 1)
    +    }
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert( === v1.StageStatus.COMPLETE)
    +      assert( === 1)
    +      assert( === 0)
    +      assert( === pending.size)
    +    }
    +    // Submit stage 2.
    +    time += 1
    +    stages.last.submissionTime = Some(time)
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, 
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1)
    +    }
    +    check[StageDataWrapper](key(stages.last)) { stage =>
    +      assert( === v1.StageStatus.ACTIVE)
    +      assert( === Some(new 
    +    }
    +    // Start and fail all tasks of stage 2.
    +    time += 1
    +    val s2Tasks = createTasks(4, time)
    +    s2Tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, 
stages.last.attemptId, task))
    +    }
    +    time += 1
    +    s2Tasks.foreach { task =>
    +      task.markFinished(TaskState.FAILED, time)
    +      listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, 
    +        "taskType", TaskResultLost, task, null))
    +    }
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1 + s2Tasks.size)
    +      assert( === 0)
    +    }
    +    check[StageDataWrapper](key(stages.last)) { stage =>
    +      assert( === s2Tasks.size)
    +      assert( === 0)
    +    }
    +    // Fail stage 2.
    +    time += 1
    +    stages.last.completionTime = Some(time)
    +    stages.last.failureReason = Some("uh oh")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stages.last))
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 1)
    +      assert( === 1)
    +    }
    +    check[StageDataWrapper](key(stages.last)) { stage =>
    +      assert( === v1.StageStatus.FAILED)
    +      assert( === s2Tasks.size)
    +      assert( === 0)
    +      assert( === 0)
    +    }
    +    // - Re-submit stage 2, all tasks, and succeed them and the stage.
    +    val oldS2 = stages.last
    +    val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1,, oldS2.numTasks,
    +      oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
    +    time += 1
    +    newS2.submissionTime = Some(time)
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps))
    +    assert(store.count(classOf[StageDataWrapper]) === 3)
    +    val newS2Tasks = createTasks(4, time)
    +    newS2Tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, 
newS2.attemptId, task))
    +    }
    +    time += 1
    +    newS2Tasks.foreach { task =>
    +      task.markFinished(TaskState.FINISHED, time)
    +      listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, 
newS2.attemptId, "taskType", Success,
    +        task, null))
    +    }
    +    time += 1
    +    newS2.completionTime = Some(time)
    +    listener.onStageCompleted(SparkListenerStageCompleted(newS2))
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 0)
    +      assert( === 1)
    +      assert( === 2)
    +    }
    +    check[StageDataWrapper](key(newS2)) { stage =>
    +      assert( === v1.StageStatus.COMPLETE)
    +      assert( === 0)
    +      assert( === newS2Tasks.size)
    +    }
    +    // End job.
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
    +    check[JobDataWrapper](1) { job =>
    +      assert( === JobExecutionStatus.SUCCEEDED)
    +    }
    +    // Submit a second job that re-uses stage 1 and stage 2. Stage 1 won't 
be re-run, but
    +    // stage 2 will. In any case, the DAGScheduler creates new info 
structures that are copies
    +    // of the old stages, so mimic that behavior here. The "new" stage 1 
is submitted without
    +    // a submission time, which means it is "skipped", and the stage 2 
re-execution should not
    +    // change the stats of the already finished job.
    +    time += 1
    +    val j2Stages = Seq(
    +      new StageInfo(3, 0, "stage1", 4, Nil, Nil, "details1"),
    +      new StageInfo(4, 0, "stage2", 4, Nil, Seq(3), "details2"))
    +    j2Stages.last.submissionTime = Some(time)
    +    listener.onJobStart(SparkListenerJobStart(2, time, j2Stages, null))
    +    assert(store.count(classOf[JobDataWrapper]) === 2)
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.head, 
    +    listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.head))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.last, 
    +    assert(store.count(classOf[StageDataWrapper]) === 5)
    +    time += 1
    +    val j2s2Tasks = createTasks(4, time)
    +    j2s2Tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, 
    +        task))
    +    }
    +    time += 1
    +    j2s2Tasks.foreach { task =>
    +      task.markFinished(TaskState.FINISHED, time)
    +      listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, 
    +        "taskType", Success, task, null))
    +    }
    +    time += 1
    +    j2Stages.last.completionTime = Some(time)
    +    listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.last))
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
    +    check[JobDataWrapper](1) { job =>
    +      assert( === 2)
    +      assert( === s1Tasks.size + s2Tasks.size)
    +    }
    +    check[JobDataWrapper](2) { job =>
    +      assert( === JobExecutionStatus.SUCCEEDED)
    +      assert( === 1)
    +      assert( === j2s2Tasks.size)
    +      assert( === 1)
    +      assert( === s1Tasks.size)
    +    }
    +    // Blacklist an executor.
    +    time += 1
    +    listener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, 
"1", 42))
    +    check[ExecutorSummaryWrapper]("1") { exec =>
    +      assert(
    +    }
    +    time += 1
listener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "1"))
    +    check[ExecutorSummaryWrapper]("1") { exec =>
    +      assert(!
    +    }
    +    // Blacklist a node.
    +    time += 1
    +    listener.onNodeBlacklisted(SparkListenerNodeBlacklisted(time, 
"", 2))
    +    check[ExecutorSummaryWrapper]("1") { exec =>
    +      assert(
    +    }
    +    time += 1
    +    listener.onNodeUnblacklisted(SparkListenerNodeUnblacklisted(time, 
    +    check[ExecutorSummaryWrapper]("1") { exec =>
    +      assert(!
    +    }
    +    // Stop executors.
    +    listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", 
    +    listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", 
    +    Seq("1", "2").foreach { id =>
    +      check[ExecutorSummaryWrapper](id) { exec =>
    +        assert( === id)
    +        assert(!
    +      }
    +    }
    +    // End the application.
    +    listener.onApplicationEnd(SparkListenerApplicationEnd(42L))
    +    check[ApplicationInfoWrapper]("id") { app =>
    +      assert( === "name")
    +      assert( === "id")
    +      assert( === 1)
    +      val attempt =
    +      assert(attempt.attemptId === Some("attempt"))
    +      assert(attempt.startTime === new Date(1L))
    +      assert(attempt.lastUpdated === new Date(42L))
    +      assert(attempt.endTime === new Date(42L))
    +      assert(attempt.duration === 41L)
    +      assert(attempt.sparkUser === "user")
    +      assert(attempt.completed)
    +    }
    +  }
    +  test("storage events") {
    +    val listener = new AppStatusListener(store)
    +    val maxMemory = 42L
    +    // Register a couple of block managers.
    +    val bm1 = BlockManagerId("1", "", 42)
    +    val bm2 = BlockManagerId("2", "", 84)
    +    Seq(bm1, bm2).foreach { bm =>
    +      listener.onExecutorAdded(SparkListenerExecutorAdded(1L, 
    +        new ExecutorInfo(, 1, Map())))
    +      listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, 
    +      check[ExecutorSummaryWrapper](bm.executorId) { exec =>
    +        assert( === maxMemory)
    +      }
    +    }
    +    val rdd1b1 = RDDBlockId(1, 1)
    +    val level = StorageLevel.MEMORY_AND_DISK
    +    // Submit a stage and make sure the RDD is recorded.
    +    val rddInfo = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
    +    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rddInfo), Nil, 
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
    +    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
    +      assert( ===
    +      assert( === rddInfo.numPartitions)
    +      assert( === 
    +    }
    +    // Add partition 1 replicated on two block managers.
listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, 
level, 1L, 1L)))
    +    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
    +      assert( === 1L)
    +      assert( === 1L)
    +      assert(
    +      assert( === 1)
    +      val dist =
    +      assert(dist.address === bm1.hostPort)
    +      assert(dist.memoryUsed === 1L)
    +      assert(dist.diskUsed === 1L)
    +      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
    +      assert(
    +      assert( === 1)
    +      val part =
    +      assert(part.blockName ===
    +      assert(part.storageLevel === level.description)
    +      assert(part.memoryUsed === 1L)
    +      assert(part.diskUsed === 1L)
    +      assert(part.executors === Seq(bm1.executorId))
    +    }
    +    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
    +      assert( === 1L)
    +      assert( === 1L)
    +      assert( === 1L)
    +    }
listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, 
level, 1L, 1L)))
    +    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
    +      assert( === 2L)
    +      assert( === 2L)
    +      assert( === 2L)
    +      assert( === 1L)
    +      val dist = == 
    +      assert(dist.memoryUsed === 1L)
    +      assert(dist.diskUsed === 1L)
    +      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
    +      val part =
    +      assert(part.memoryUsed === 2L)
    +      assert(part.diskUsed === 2L)
    +      assert(part.executors === Seq(bm1.executorId, bm2.executorId))
    +    }
    +    check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
    +      assert( === 1L)
    +      assert( === 1L)
    +      assert( === 1L)
    +    }
    +    // Add a second partition only to bm 1.
    +    val rdd1b2 = RDDBlockId(1, 2)
listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b2, 
    +      3L, 3L)))
    +    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
    +      assert( === 5L)
    +      assert( === 5L)
    +      assert( === 2L)
    +      assert( === 2L)
    +      val dist = == 
    +      assert(dist.memoryUsed === 4L)
    +      assert(dist.diskUsed === 4L)
    +      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
    +      val part = ===
    +      assert(part.storageLevel === level.description)
    +      assert(part.memoryUsed === 3L)
    +      assert(part.diskUsed === 3L)
    +      assert(part.executors === Seq(bm1.executorId))
    +    }
    +    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
    +      assert( === 2L)
    +      assert( === 4L)
    +      assert( === 4L)
    +    }
    +    // Remove block 1 from bm 1.
listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1,
    +      StorageLevel.NONE, 1L, 1L)))
    +    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
    +      assert( === 4L)
    +      assert( === 4L)
    +      assert( === 2L)
    +      assert( === 2L)
    +      val dist = == 
    +      assert(dist.memoryUsed === 3L)
    +      assert(dist.diskUsed === 3L)
    +      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
    +      val part = ===
    +      assert(part.storageLevel === level.description)
    +      assert(part.memoryUsed === 1L)
    +      assert(part.diskUsed === 1L)
    +      assert(part.executors === Seq(bm2.executorId))
    +    }
    +    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
    +      assert( === 1L)
    +      assert( === 3L)
    +      assert( === 3L)
    +    }
    --- End diff --
    We can add those in when this part 
of the listener is fixed.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to