[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20481 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165934452 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -888,8 +888,8 @@ private[spark] class AppStatusListener( return } -val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), -countToDelete.toInt) { s => +val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165934385 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -875,8 +875,8 @@ private[spark] class AppStatusListener( return } -val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), -countToDelete.toInt) { j => +val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L) --- End diff -- use `TaskIndexNames.COMPLETION_TIME`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165568794 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" + final val COMPLETION_TIME = "completionTime" --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165559743 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" + final val COMPLETION_TIME = "completionTime" --- End diff -- No, right now there's no support for that for indices. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165553002 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" + final val COMPLETION_TIME = "completionTime" --- End diff -- I've asked this before, is it possible to put an ID instead of the index name to the kvstore? Then we can use long index names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165544206 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -73,6 +73,8 @@ private[spark] class JobDataWrapper( @JsonIgnore @KVIndex private def id: Int = info.jobId + @JsonIgnore @KVIndex("completionTime") + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1) --- End diff -- nit: `-1` -> `-1L` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165544232 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -90,6 +92,8 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex("active") private def active: Boolean = info.status == StageStatus.ACTIVE + @JsonIgnore @KVIndex("completionTime") + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1) --- End diff -- nit: `-1` -> `-1L` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165543319 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" + final val COMPLETION_TIME = "completionTime" --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165541492 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames { final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" + final val COMPLETION_TIME = "completionTime" --- End diff -- Could you use a shorter name like the others? It saves a little bit more space on disk because there are so many tasks in large apps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165529443 --- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala --- @@ -73,6 +73,8 @@ private[spark] class JobDataWrapper( @JsonIgnore @KVIndex private def id: Int = info.jobId + @JsonIgnore @KVIndex("completionTime") + private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(Long.MaxValue) --- End diff -- This is fine, but if you want you can probably replace the filters in the listener by setting this to `-1` for running jobs / stages / others, and starting iteration at "0". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165512677 --- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala --- @@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging { db } - /** Turns a KVStoreView into a Scala sequence, applying a filter. */ - def viewToSeq[T]( - view: KVStoreView[T], - max: Int) - (filter: T => Boolean): Seq[T] = { + /** + * Turns a KVStoreView into a Scala sequence, applying a filter, sorting the sequence and + * selecting the first `max` values. + */ + def viewToSeq[T, S: Ordering]( +view: KVStoreView[T], +max: Int) +(filter: T => Boolean)(sorter: T => S): Seq[T] = { val iter = view.closeableIterator() try { - iter.asScala.filter(filter).take(max).toList + iter.asScala.filter(filter).toList.sortBy(sorter).take(max) --- End diff -- Adding indices is super easy. e.g.: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/storeTypes.scala#L90 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165512101 --- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala --- @@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging { db } - /** Turns a KVStoreView into a Scala sequence, applying a filter. */ - def viewToSeq[T]( - view: KVStoreView[T], - max: Int) - (filter: T => Boolean): Seq[T] = { + /** + * Turns a KVStoreView into a Scala sequence, applying a filter, sorting the sequence and + * selecting the first `max` values. + */ + def viewToSeq[T, S: Ordering]( +view: KVStoreView[T], +max: Int) +(filter: T => Boolean)(sorter: T => S): Seq[T] = { val iter = view.closeableIterator() try { - iter.asScala.filter(filter).take(max).toList + iter.asScala.filter(filter).toList.sortBy(sorter).take(max) --- End diff -- @vanzin Yeah, I understand the expensive sort. However, adding indices needs more work. Do you have time to try it since I'm not familiar with LevelDB? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20481#discussion_r165510869 --- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala --- @@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging { db } - /** Turns a KVStoreView into a Scala sequence, applying a filter. */ - def viewToSeq[T]( - view: KVStoreView[T], - max: Int) - (filter: T => Boolean): Seq[T] = { + /** + * Turns a KVStoreView into a Scala sequence, applying a filter, sorting the sequence and + * selecting the first `max` values. + */ + def viewToSeq[T, S: Ordering]( +view: KVStoreView[T], +max: Int) +(filter: T => Boolean)(sorter: T => S): Seq[T] = { val iter = view.closeableIterator() try { - iter.asScala.filter(filter).take(max).toList + iter.asScala.filter(filter).toList.sortBy(sorter).take(max) --- End diff -- So, aside from the two closure parameters making the calls super ugly, this is more expensive than the previous version. Previously: - filter as you iterate over view - limit iteration - materialize "max" elements Now: - filter as you iterate over view - materialize all elements that pass the filter - sort and take "max" elements This will, at least, make replaying large apps a lot slower, given the filter in the task cleanup method. ``` // Try to delete finished tasks only. val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => !live || t.status != TaskState.RUNNING.toString() } ``` So, when replaying, every time you need to do a cleanup of tasks, you'll deserialize *all* tasks for the stage. If you have a stage with 10s of thousands of tasks, that's super expensive. If all you want to change here is the sorting of jobs, I'd recommend adding a new index to `JobDataWrapper` that sorts them by end time. Then you can do the sorting before you even call this method, by setting up the `view` appropriately. If you also want to sort the others (stages, tasks, and sql executions), you could also create indices for those. Or you could find a way to do this that is not so expensive on the replay side... If adding indices, though, I'd probably try to get this into 2.3.0 since it would change the data written to disk. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/20481 [SPARK-23307][WEBUI]Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them ## What changes were proposed in this pull request? Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them to make the behavior consistent with 2.2. ## How was this patch tested? - Jenkins. - Manually ran the following codes with `spark.ui.retainedJobs=10` and confirmed job 0 was kept in the UI ``` new Thread() { override def run() { // job 0 sc.makeRDD(1 to 1, 1).foreach { i => Thread.sleep(1) } } }.start() Thread.sleep(1000) for (_ <- 1 to 20) { new Thread() { override def run() { sc.makeRDD(1 to 1, 1).foreach { i => } } }.start() } Thread.sleep(15000) sc.makeRDD(1 to 1, 1).foreach { i => } ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-23307 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20481 commit 761f1ee1f3ef98a1c9d5f3d7e5c4ecbb71755656 Author: Shixiong ZhuDate: 2018-02-01T21:59:43Z Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org