Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ee2bd70a4 -> a6664dcd8


[SPARK-6079] Use index to speed up StatusTracker.getJobIdsForGroup()

`StatusTracker.getJobIdsForGroup()` is implemented via a linear scan over a 
HashMap rather than using an index, which might be an expensive operation if 
there are many (e.g. thousands) of retained jobs.

This patch adds a new map to `JobProgressListener` in order to speed up these 
lookups.

Author: Josh Rosen <joshro...@databricks.com>

Closes #4830 from JoshRosen/statustracker-job-group-indexing and squashes the 
following commits:

e39c5c7 [Josh Rosen] Address review feedback
6709fb2 [Josh Rosen] Merge remote-tracking branch 'origin/master' into 
statustracker-job-group-indexing
2c49614 [Josh Rosen] getOrElse
97275a7 [Josh Rosen] Add jobGroup to jobId index to JobProgressListener

(cherry picked from commit d44a3362ed8cf3068f8ff233e13851a39da42219)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6664dcd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6664dcd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6664dcd

Branch: refs/heads/branch-1.3
Commit: a6664dcd88a0bdaa8985844cd485d3c4a71eba1b
Parents: ee2bd70
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed Mar 25 17:40:00 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Apr 2 13:12:32 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkStatusTracker.scala   |  3 +-
 .../spark/ui/jobs/JobProgressListener.scala     | 23 +++++++++++++--
 .../ui/jobs/JobProgressListenerSuite.scala      | 31 ++++++++++++++++++--
 3 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6664dcd/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index edbdda8..34ee3a4 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
    */
   def getJobIdsForGroup(jobGroup: String): Array[Int] = {
     jobProgressListener.synchronized {
-      val jobData = jobProgressListener.jobIdToData.valuesIterator
-      jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
+      jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, 
Seq.empty).toArray
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6664dcd/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 937d95a..78eae53 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   // These type aliases are public because they're used in the types of public 
fields:
 
   type JobId = Int
+  type JobGroupId = String
   type StageId = Int
   type StageAttemptId = Int
   type PoolName = String
@@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   val completedJobs = ListBuffer[JobUIData]()
   val failedJobs = ListBuffer[JobUIData]()
   val jobIdToData = new HashMap[JobId, JobUIData]
+  val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
 
   // Stages:
   val pendingStages = new HashMap[StageId, StageInfo]
@@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     Map(
       "jobIdToData" -> jobIdToData.size,
       "stageIdToData" -> stageIdToData.size,
-      "stageIdToStageInfo" -> stageIdToInfo.size
+      "stageIdToStageInfo" -> stageIdToInfo.size,
+      "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
+      // Since jobGroupToJobIds is map of sets, check that we don't leak keys 
with empty values:
+      "jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
     )
   }
 
@@ -140,7 +145,19 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     if (jobs.size > retainedJobs) {
       val toRemove = math.max(retainedJobs / 10, 1)
       jobs.take(toRemove).foreach { job =>
-        jobIdToData.remove(job.jobId)
+        // Remove the job's UI data, if it exists
+        jobIdToData.remove(job.jobId).foreach { removedJob =>
+          // A null jobGroupId is used for jobs that are run without a job 
group
+          val jobGroupId = removedJob.jobGroup.orNull
+          // Remove the job group -> job mapping entry, if it exists
+          jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
+            jobsInGroup.remove(job.jobId)
+            // If this was the last job in this job group, remove the map 
entry for the job group
+            if (jobsInGroup.isEmpty) {
+              jobGroupToJobIds.remove(jobGroupId)
+            }
+          }
+        }
       }
       jobs.trimStart(toRemove)
     }
@@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
         stageIds = jobStart.stageIds,
         jobGroup = jobGroup,
         status = JobExecutionStatus.RUNNING)
+    // A null jobGroupId is used for jobs that are run without a job group
+    jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new 
HashSet[JobId]).add(jobStart.jobId)
     jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
     // Compute (a potential underestimate of) the number of tasks that will be 
run by this job.
     // This may be an underestimate because the job start event references all 
of the result

http://git-wip-us.apache.org/repos/asf/spark/blob/a6664dcd/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 730a4b5..c0c28cb 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ui.jobs
 
+import java.util.Properties
+
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
@@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     SparkListenerStageCompleted(stageInfo)
   }
 
-  private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
+  private def createJobStartEvent(
+      jobId: Int,
+      stageIds: Seq[Int],
+      jobGroup: Option[String] = None): SparkListenerJobStart = {
     val stageInfos = stageIds.map { stageId =>
       new StageInfo(stageId, 0, stageId.toString, 0, null, "")
     }
-    SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
+    val properties: Option[Properties] = jobGroup.map { groupId =>
+      val props = new Properties()
+      props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+      props
+    }
+    SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, 
properties.orNull)
   }
 
   private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
@@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     listener.stageIdToActiveJobIds.size should be (0)
   }
 
+  test("test clearing of jobGroupToJobIds") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedJobs", 5.toString)
+    val listener = new JobProgressListener(conf)
+
+    // Run 50 jobs, each with one stage
+    for (jobId <- 0 to 50) {
+      listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = 
Some(jobId.toString)))
+      listener.onStageSubmitted(createStageStartEvent(0))
+      listener.onStageCompleted(createStageEndEvent(0, failed = false))
+      listener.onJobEnd(createJobEndEvent(jobId, false))
+    }
+    assertActiveJobsStateIsEmpty(listener)
+    // This collection won't become empty, but it should be bounded by 
spark.ui.retainedJobs
+    listener.jobGroupToJobIds.size should be (5)
+  }
+
   test("test LRU eviction of jobs") {
     val conf = new SparkConf()
     conf.set("spark.ui.retainedStages", 5.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to