[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20481


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165934452
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -888,8 +888,8 @@ private[spark] class AppStatusListener(
   return
 }
 
-val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
-countToDelete.toInt) { s =>
+val view = 
kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165934385
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -875,8 +875,8 @@ private[spark] class AppStatusListener(
   return
 }
 
-val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
-countToDelete.toInt) { j =>
+val view = 
kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
--- End diff --

use `TaskIndexNames.COMPLETION_TIME`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165568794
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
   final val STAGE = "stage"
   final val STATUS = "sta"
   final val TASK_INDEX = "idx"
+  final val COMPLETION_TIME = "completionTime"
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165559743
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
   final val STAGE = "stage"
   final val STATUS = "sta"
   final val TASK_INDEX = "idx"
+  final val COMPLETION_TIME = "completionTime"
--- End diff --

No, right now there's no support for that for indices.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165553002
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
   final val STAGE = "stage"
   final val STATUS = "sta"
   final val TASK_INDEX = "idx"
+  final val COMPLETION_TIME = "completionTime"
--- End diff --

I've asked this before, is it possible to put an ID instead of the index 
name to the kvstore? Then we can use long index names.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165544206
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -73,6 +73,8 @@ private[spark] class JobDataWrapper(
   @JsonIgnore @KVIndex
   private def id: Int = info.jobId
 
+  @JsonIgnore @KVIndex("completionTime")
+  private def completionTime: Long = 
info.completionTime.map(_.getTime).getOrElse(-1)
--- End diff --

nit: `-1` -> `-1L`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165544232
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -90,6 +92,8 @@ private[spark] class StageDataWrapper(
   @JsonIgnore @KVIndex("active")
   private def active: Boolean = info.status == StageStatus.ACTIVE
 
+  @JsonIgnore @KVIndex("completionTime")
+  private def completionTime: Long = 
info.completionTime.map(_.getTime).getOrElse(-1)
--- End diff --

nit: `-1` -> `-1L`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165543319
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
   final val STAGE = "stage"
   final val STATUS = "sta"
   final val TASK_INDEX = "idx"
+  final val COMPLETION_TIME = "completionTime"
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165541492
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
   final val STAGE = "stage"
   final val STATUS = "sta"
   final val TASK_INDEX = "idx"
+  final val COMPLETION_TIME = "completionTime"
--- End diff --

Could you use a shorter name like the others? It saves a little bit more 
space on disk because there are so many tasks in large apps.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165529443
  
--- Diff: core/src/main/scala/org/apache/spark/status/storeTypes.scala ---
@@ -73,6 +73,8 @@ private[spark] class JobDataWrapper(
   @JsonIgnore @KVIndex
   private def id: Int = info.jobId
 
+  @JsonIgnore @KVIndex("completionTime")
+  private def completionTime: Long = 
info.completionTime.map(_.getTime).getOrElse(Long.MaxValue)
--- End diff --

This is fine, but if you want you can probably replace the filters in the 
listener by setting this to `-1` for running jobs / stages / others, and 
starting iteration at "0".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165512677
  
--- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala ---
@@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging {
 db
   }
 
-  /** Turns a KVStoreView into a Scala sequence, applying a filter. */
-  def viewToSeq[T](
-  view: KVStoreView[T],
-  max: Int)
-  (filter: T => Boolean): Seq[T] = {
+  /**
+   * Turns a KVStoreView into a Scala sequence, applying a filter, sorting 
the sequence and
+   * selecting the first `max` values.
+   */
+  def viewToSeq[T, S: Ordering](
+view: KVStoreView[T],
+max: Int)
+(filter: T => Boolean)(sorter: T => S): Seq[T] = {
 val iter = view.closeableIterator()
 try {
-  iter.asScala.filter(filter).take(max).toList
+  iter.asScala.filter(filter).toList.sortBy(sorter).take(max)
--- End diff --

Adding indices is super easy. e.g.:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/storeTypes.scala#L90


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165512101
  
--- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala ---
@@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging {
 db
   }
 
-  /** Turns a KVStoreView into a Scala sequence, applying a filter. */
-  def viewToSeq[T](
-  view: KVStoreView[T],
-  max: Int)
-  (filter: T => Boolean): Seq[T] = {
+  /**
+   * Turns a KVStoreView into a Scala sequence, applying a filter, sorting 
the sequence and
+   * selecting the first `max` values.
+   */
+  def viewToSeq[T, S: Ordering](
+view: KVStoreView[T],
+max: Int)
+(filter: T => Boolean)(sorter: T => S): Seq[T] = {
 val iter = view.closeableIterator()
 try {
-  iter.asScala.filter(filter).take(max).toList
+  iter.asScala.filter(filter).toList.sortBy(sorter).take(max)
--- End diff --

@vanzin Yeah, I understand the expensive sort. However, adding indices 
needs more work. Do you have time to try it since I'm not familiar with LevelDB?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20481#discussion_r165510869
  
--- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala ---
@@ -69,14 +69,17 @@ private[spark] object KVUtils extends Logging {
 db
   }
 
-  /** Turns a KVStoreView into a Scala sequence, applying a filter. */
-  def viewToSeq[T](
-  view: KVStoreView[T],
-  max: Int)
-  (filter: T => Boolean): Seq[T] = {
+  /**
+   * Turns a KVStoreView into a Scala sequence, applying a filter, sorting 
the sequence and
+   * selecting the first `max` values.
+   */
+  def viewToSeq[T, S: Ordering](
+view: KVStoreView[T],
+max: Int)
+(filter: T => Boolean)(sorter: T => S): Seq[T] = {
 val iter = view.closeableIterator()
 try {
-  iter.asScala.filter(filter).take(max).toList
+  iter.asScala.filter(filter).toList.sortBy(sorter).take(max)
--- End diff --

So, aside from the two closure parameters making the calls super ugly, this 
is more expensive than the previous version.

Previously:
- filter as you iterate over view
- limit iteration
- materialize "max" elements

Now:
- filter as you iterate over view
- materialize all elements that pass the filter
- sort and take "max" elements

This will, at least, make replaying large apps a lot slower, given the 
filter in the task cleanup method.

```
// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
  !live || t.status != TaskState.RUNNING.toString()
}
```

So, when replaying, every time you need to do a cleanup of tasks, you'll 
deserialize *all* tasks for the stage. If you have a stage with 10s of 
thousands of tasks, that's super expensive.

If all you want to change here is the sorting of jobs, I'd recommend adding 
a new index to `JobDataWrapper` that sorts them by end time. Then you can do 
the sorting before you even call this method, by setting up the `view` 
appropriately.

If you also want to sort the others (stages, tasks, and sql executions), 
you could also create indices for those.

Or you could find a way to do this that is not so expensive on the replay 
side...

If adding indices, though, I'd probably try to get this into 2.3.0 since it 
would change the data written to disk.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20481: [SPARK-23307][WEBUI]Sort jobs/stages/tasks/querie...

2018-02-01 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/20481

[SPARK-23307][WEBUI]Sort jobs/stages/tasks/queries with the completed 
timestamp before cleaning up them

## What changes were proposed in this pull request?

Sort jobs/stages/tasks/queries with the completed timestamp before cleaning 
up them to make the behavior consistent with 2.2.

## How was this patch tested?

- Jenkins.
- Manually ran the following codes with `spark.ui.retainedJobs=10` and 
confirmed job 0 was kept in the UI
```
new Thread() {
  override def run() {

// job 0
sc.makeRDD(1 to 1, 1).foreach { i =>
Thread.sleep(1)
   }
  }
}.start()

Thread.sleep(1000)

for (_ <- 1 to 20) {
  new Thread() {
override def run() {
  sc.makeRDD(1 to 1, 1).foreach { i =>
  }
}
  }.start()
}

Thread.sleep(15000)
  sc.makeRDD(1 to 1, 1).foreach { i =>
}
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-23307

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20481


commit 761f1ee1f3ef98a1c9d5f3d7e5c4ecbb71755656
Author: Shixiong Zhu 
Date:   2018-02-01T21:59:43Z

Sort jobs/stages/tasks/queries with the completed timestamp before cleaning 
up them




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org