[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23088


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-03 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238492653
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite {
 assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) }
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-03 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238492582
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
 // cheaper for disk stores (avoids deserialization).
 val count = {
   Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
-  .closeableIterator()
+if (store.isInstanceOf[LevelDB]) {
--- End diff --

Done. Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238406928
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
 // cheaper for disk stores (avoids deserialization).
 val count = {
   Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
-  .closeableIterator()
+if (store.isInstanceOf[LevelDB]) {
--- End diff --

Could you invert the check so you don't need to import `LevelDB`? Just to 
avoid importing more implementation details of the kvstore module into this 
class...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238407621
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite {
 assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) }
--- End diff --

nit: `status = "FAILED"` when param has a default value


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-02 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238135277
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
 // cheaper for disk stores (avoids deserialization).
 val count = {
   Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
-  .closeableIterator()
+if (store.isInstanceOf[LevelDB]) {
--- End diff --

Yes. Now, for diskStore case, it finds total tasks count and inMemory case 
only successful tasks count.

This 'count' is used to find quantileIndices for all the tasks metrics.

https://github.com/apache/spark/blob/676bbb2446af1f281b8f76a5428b7ba75b7588b3/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala#L222


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238110710
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
 // cheaper for disk stores (avoids deserialization).
 val count = {
   Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
-  .closeableIterator()
+if (store.isInstanceOf[LevelDB]) {
--- End diff --

Does this code path need to be different for disk vs memory? this part 
seemed like it could work efficiently either way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-12-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r238110723
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -221,29 +230,49 @@ private[spark] class AppStatusStore(
 // stabilize once the stage finishes. It's also slow, especially with 
disk stores.
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
+// TODO Summary metrics needs to display all the successful tasks' 
metrics (SPARK-26119).
--- End diff --

It's not ideal but it's a reasonable solution. Are you OK with it @vanzin ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r237197020
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

I'm not sure the fudging would give you much better results. Perhaps a 
little bit...

As for the correct solution, I don't see a lot of good options.

- loading all successful tasks in memory will be a little slow and 
memory-intensive for large stages

Using my 100k stage example, deserializing everything, even once, will 
stlil be a little expensive (deserializing 100k tasks vs. deserializing 
`metricCount * quantileCount` tasks which is 100 tasks and change in the UI 
case).

- changing the way the indexing works, so that you can index by specific 
metrics for successful and failed tasks differently, would be tricky, and also 
would require changing the disk store version (to invalidate old stores).

- maybe something else I'm not able to come up with at the moment.

Unless someone has a better idea, maybe just using this code when using an 
in-memory store is an ok temporary "fix".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-28 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r237164158
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

I see so there's no real way to do it right without deserializing 
everything because that's the only way to know for sure what's successful.

Leaving it as-is isn't so bad; if failed tasks are infrequent then the 
quantiles are still about right.

I can think of fudges like searching ahead from the current index for the 
next successful task and using that. For the common case that might make the 
quantiles a little better. Iterate ahead, incrementing idx, and give up when 
hitting the next index.

Hm @vanzin is that too complex vs taking the hit and deserializing it all? 
or just punt on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-28 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r237154542
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

@srowen  Yes. everything is loaded in "sorted" order based on index, and 
then we do filtering. For In memory case, this doesn't cause any issue. but for 
diskStore extra de serialization overhead is there. 

May be one possible solution can be, for diskStore case, bring only first 
time and sort  based on the corresponding indices to compute the quantiles.

If the solution seems complicated, then we can tell the user that, summary 
metrics display the quantile summary of all the tasks, instead of completed.

correct me if I am wrong


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-28 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r237135103
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

Pardon if I'm missing something, but does this avoid the problem? 
everything is loaded, and then it's filtered.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236455725
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

@vanzin Yes. It seems, loading the stage page take lot more time if we 
enable disk store. 
InMemory store seems okay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236409746
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -95,10 +123,18 @@ class AppStatusStoreSuite extends SparkFunSuite {
 
   private def newTaskData(i: Int): TaskDataWrapper = {
 new TaskDataWrapper(
-  i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, 
false, Nil, None,
+  i, i, i, i, i, i, i.toString, i.toString, "SUCCESS", i.toString, 
false, Nil, None,
   i, i, i, i, i, i, i, i, i, i,
   i, i, i, i, i, i, i, i, i, i,
   i, i, i, i, stageId, attemptId)
   }
 
+  private def failedTaskData(i: Int): TaskDataWrapper = {
--- End diff --

Thanks. done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236409661
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
--- End diff --

Thanks. I have modified


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236409557
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

Thank you @vanzin for the review. I will check the time with large number 
of tasks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236405732
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
--- End diff --

It doesn't make a runtime different. That's why this is a style nit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236405278
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
--- End diff --

I don't feel strongly about it but I have written it as above where a new 
block isn't needed. I don't think it makes any runtime difference.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236398604
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -95,10 +123,18 @@ class AppStatusStoreSuite extends SparkFunSuite {
 
   private def newTaskData(i: Int): TaskDataWrapper = {
 new TaskDataWrapper(
-  i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, 
false, Nil, None,
+  i, i, i, i, i, i, i.toString, i.toString, "SUCCESS", i.toString, 
false, Nil, None,
   i, i, i, i, i, i, i, i, i, i,
   i, i, i, i, i, i, i, i, i, i,
   i, i, i, i, stageId, attemptId)
   }
 
+  private def failedTaskData(i: Int): TaskDataWrapper = {
--- End diff --

better to have the status be a parameter to the existing method (with a 
default value)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236398482
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
--- End diff --

style for these is `.filter { x => ... }` (happens in a bunch of places)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236398316
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

There's a comment at the top of this method that explains why `skip` was 
used. It avoids deserialization of data that is not needed here, which can get 
quite expensive. IIRC there's about 26 metrics, and the updated code means 
deserializing `26 * numberOfTasks` task instances from the disk store. With 
large stages that can become really slow.

Try creating a large stage (e.g. `sc.parallelize(1 to 10, 
10).count()`), loading the resulting event log through the history server, 
and checking how long it takes to load the stage page the first time. The goal 
is to make it reasonably fast (I think it's currently in the 4-5s range). Too 
slow and it makes the page not very usable.

If this makes that too slow, perhaps loading all the successful task into 
memory might be an ok workaround. It sucks (there is a spike in memory usage) 
but shouldn't be too bad (guessing 256 bytes per `TaskDataWrapper` object, 100k 
tasks ~ 24Mb which doesn't sound horrible).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236236422
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

CC @vanzin for the proposed new method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236236327
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -150,8 +150,9 @@ private[spark] class AppStatusStore(
   Utils.tryWithResource(
 store.view(classOf[TaskDataWrapper])
   .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
+  .index(TaskIndexNames.STATUS)
+  .first("SUCCESS")
+  .last("SUCCESS")
--- End diff --

I see, thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236234523
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -150,8 +150,9 @@ private[spark] class AppStatusStore(
   Utils.tryWithResource(
 store.view(classOf[TaskDataWrapper])
   .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
+  .index(TaskIndexNames.STATUS)
+  .first("SUCCESS")
+  .last("SUCCESS")
--- End diff --

Here the index in "status". So, it will read in a sorted order based on the 
"status". ie. "SUCCESS" task will be in one group, "FAILED" task be after that 
and so on.  So, if we do  first("success") to last("success"), it will count 
only successful tasks, not all task between the first and last successful one.

Also, In the UT I have added, even indices has "success" tasks and odd 
indices has "failed tasks". But the count is still 3. ie. ("0", "2", "4" )


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236234400
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

Thank you @srowen for the suggestion. I will try adding the filter method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236228669
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

Hm, I think this reintroduces the problem that this code fixed though. What 
about adding some kind of filter() method to KVStoreView? it could configure an 
optional `Predicate` that is applied to `iterator()`. Commons Collections 
has a `FilterIterator` for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-26 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236228862
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -150,8 +150,9 @@ private[spark] class AppStatusStore(
   Utils.tryWithResource(
 store.view(classOf[TaskDataWrapper])
   .parent(stageKey)
-  .index(TaskIndexNames.EXEC_RUN_TIME)
-  .first(0L)
+  .index(TaskIndexNames.STATUS)
+  .first("SUCCESS")
+  .last("SUCCESS")
--- End diff --

Doesn't this count all tasks between the first and last successful one, 
rather than count just successful ones?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236120634
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

Yes. If we do, "if (status == "SUCCESS")" for every iterator value, we 
can't do the skip function.
Becuase, earlier we know the exact index we need to take. ie. we can 
directly skip to 25th percentile, 50th percentile and so on.  Now, we don't 
know which index has the 25th percentile of the "SUCCESS" value, unless we 
iterate each.

Otherwise, we have to filter the "SUCCESS" the tasks  prior, like I have 
done in the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236092947
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

I get it, but, it seems like there was a particular reason for using an 
iterator and skipping, and I'm not sure we can just undo it. If this is only to 
filter for "SUCCESS", that much seems easy enough in the original code with an 
'if' statement somewhere, no? similar code above still uses an iterator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236087784
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
+
+  if(quantileTasks.size > indices.length) {
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236087749
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite {
 assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i => store.write(failedTaskData(i)) }
+val appStore = new AppStatusStore(store).taskSummary(stageId, 
attemptId, uiQuantiles)
+assert(appStore.size === 0)
+  }
+
+  test("summary should contain task metrics of only successfull tasks") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i =>
+  if (i % 2 == 1) store.write(failedTaskData(i))
+  else store.write(newTaskData(i))
+}
+val summary = new AppStatusStore(store).taskSummary(stageId, 
attemptId, uiQuantiles).get
+
+val values = (0 to 2).map( i =>
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236087731
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

Yes. That was the original intend. unfortunately after converting to the 
scala collection, the skip() functionality is not there. Also the kvstore 
doesn't have any filter API to filter the "success" tasks.

The PR was for reducing the computational time for loading  the stagePage 
from the diskStore ( for history server), by avoiding in memory sorting.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236087746
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite {
 assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i => store.write(failedTaskData(i)) }
+val appStore = new AppStatusStore(store).taskSummary(stageId, 
attemptId, uiQuantiles)
+assert(appStore.size === 0)
+  }
+
+  test("summary should contain task metrics of only successfull tasks") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i =>
+  if (i % 2 == 1) store.write(failedTaskData(i))
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236087741
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
+
+  if(quantileTasks.size > indices.length) {
+  quantileTasks.map(task => fn(task._1).toDouble).toIndexedSeq
+  } else {
+indices.map( index =>
+ fn(quantileTasks.filter(_._2 == 
index).head._1).toDouble).toIndexedSeq
--- End diff --

Modified. Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236087736
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236085936
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
--- End diff --

This used to get an iterator. I wonder if it's safe to materialize the 
whole collection? it seems to take care to use the iterator to skip over things 
for example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236085919
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks
--- End diff --

Why not check for "SUCCESS"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236086094
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite {
 assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i => store.write(failedTaskData(i)) }
+val appStore = new AppStatusStore(store).taskSummary(stageId, 
attemptId, uiQuantiles)
+assert(appStore.size === 0)
+  }
+
+  test("summary should contain task metrics of only successfull tasks") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i =>
+  if (i % 2 == 1) store.write(failedTaskData(i))
+  else store.write(newTaskData(i))
+}
+val summary = new AppStatusStore(store).taskSummary(stageId, 
attemptId, uiQuantiles).get
+
+val values = (0 to 2).map( i =>
--- End diff --

2.0 is much clearer than 2.toDouble. But why not just write `Array(0.0, 
2.0, 4.0)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236085876
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
+
+  if(quantileTasks.size > indices.length) {
--- End diff --

The formatting needs adjustment around here: space after 'if', less indent 
on the next line, no space after (, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236086033
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -222,29 +223,20 @@ private[spark] class AppStatusStore(
 val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
 
 def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
-  Utils.tryWithResource(
-store.view(classOf[TaskDataWrapper])
-  .parent(stageKey)
-  .index(index)
-  .first(0L)
-  .closeableIterator()
-  ) { it =>
-var last = Double.NaN
-var currentIdx = -1L
-indices.map { idx =>
-  if (idx == currentIdx) {
-last
-  } else {
-val diff = idx - currentIdx
-currentIdx = idx
-if (it.skip(diff - 1)) {
-  last = fn(it.next()).toDouble
-  last
-} else {
-  Double.NaN
-}
-  }
-}.toIndexedSeq
+  val quantileTasks = store.view(classOf[TaskDataWrapper])
+.parent(stageKey)
+.index(index)
+.first(0L)
+.asScala
+.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks
+.zipWithIndex
+.filter(x => indices.contains(x._2))
+
+  if(quantileTasks.size > indices.length) {
+  quantileTasks.map(task => fn(task._1).toDouble).toIndexedSeq
+  } else {
+indices.map( index =>
+ fn(quantileTasks.filter(_._2 == 
index).head._1).toDouble).toIndexedSeq
--- End diff --

.filter + .head = .find? it is more efficient. Why not create a Map though 
if this needs to be accessed by key?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r236086063
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala ---
@@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite {
 assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i => store.write(failedTaskData(i)) }
+val appStore = new AppStatusStore(store).taskSummary(stageId, 
attemptId, uiQuantiles)
+assert(appStore.size === 0)
+  }
+
+  test("summary should contain task metrics of only successfull tasks") {
+val store = new InMemoryStore()
+(0 until 5).foreach { i =>
+  if (i % 2 == 1) store.write(failedTaskData(i))
--- End diff --

Braces for if-else. I also personally think a for loop is more obvious 
here: `for (i <- 0 until 5) {`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-21 Thread shahidki31
GitHub user shahidki31 reopened a pull request:

https://github.com/apache/spark/pull/23088

[SPARK-26119][CORE][WEBUI]Task summary table should contain only successful 
tasks' metrics

## What changes were proposed in this pull request?

Task summary table in the stage page currently displays the summary of all 
the tasks. However, we should display the task summary of only successful 
tasks, to follow the behavior of previous versions of spark.


## How was this patch tested?
Added UT. attached screenshot
Before patch:
![screenshot from 2018-11-20 
00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png)


![screenshot from 2018-11-20 
01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shahidki31/spark summaryMetrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23088


commit cfe2d5f3744f2d10d917883713cd78678b5157a1
Author: Shahid 
Date:   2018-11-19T18:23:57Z

task summary should contain only successful tasks

commit 8f4498e0c67f8a83401f3b0e06aef4922ef49c20
Author: Shahid 
Date:   2018-11-21T18:37:01Z

update PR




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-20 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r235028381
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -238,8 +239,16 @@ private[spark] class AppStatusStore(
 val diff = idx - currentIdx
--- End diff --

Hi @gengliangwang I will update the PR. Actually there are more changes 
required to fix this. Thanks 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-20 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/23088#discussion_r235027227
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -238,8 +239,16 @@ private[spark] class AppStatusStore(
 val diff = idx - currentIdx
--- End diff --

`diff` could be negative here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-20 Thread shahidki31
Github user shahidki31 closed the pull request at:

https://github.com/apache/spark/pull/23088


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...

2018-11-19 Thread shahidki31
GitHub user shahidki31 opened a pull request:

https://github.com/apache/spark/pull/23088

[SPARK-26119][CORE][WEBUI]Task summary table should contain only successful 
tasks' metrics

## What changes were proposed in this pull request?

Task summary table in the stage page currently displays the summary of all 
the tasks. However, we should display the task summary of only successful 
tasks, to follow the behavior of previous versions of spark.


## How was this patch tested?
Added UT. attached screenshot
Before patch:
![screenshot from 2018-11-20 
00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png)


![screenshot from 2018-11-20 
01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shahidki31/spark summaryMetrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23088


commit cfe2d5f3744f2d10d917883713cd78678b5157a1
Author: Shahid 
Date:   2018-11-19T18:23:57Z

task summary should contain only successful tasks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org