[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23088 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238492653 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { +val store = new InMemoryStore() +(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) } --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238492582 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -148,11 +148,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() +if (store.isInstanceOf[LevelDB]) { --- End diff -- Done. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238406928 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -148,11 +148,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() +if (store.isInstanceOf[LevelDB]) { --- End diff -- Could you invert the check so you don't need to import `LevelDB`? Just to avoid importing more implementation details of the kvstore module into this class... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238407621 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { +val store = new InMemoryStore() +(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) } --- End diff -- nit: `status = "FAILED"` when param has a default value --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238135277 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -148,11 +148,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() +if (store.isInstanceOf[LevelDB]) { --- End diff -- Yes. Now, for diskStore case, it finds total tasks count and inMemory case only successful tasks count. This 'count' is used to find quantileIndices for all the tasks metrics. https://github.com/apache/spark/blob/676bbb2446af1f281b8f76a5428b7ba75b7588b3/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala#L222 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238110710 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -148,11 +148,20 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() +if (store.isInstanceOf[LevelDB]) { --- End diff -- Does this code path need to be different for disk vs memory? this part seemed like it could work efficiently either way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r238110723 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -221,29 +230,49 @@ private[spark] class AppStatusStore( // stabilize once the stage finishes. It's also slow, especially with disk stores. val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } +// TODO Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). --- End diff -- It's not ideal but it's a reasonable solution. Are you OK with it @vanzin ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r237197020 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- I'm not sure the fudging would give you much better results. Perhaps a little bit... As for the correct solution, I don't see a lot of good options. - loading all successful tasks in memory will be a little slow and memory-intensive for large stages Using my 100k stage example, deserializing everything, even once, will stlil be a little expensive (deserializing 100k tasks vs. deserializing `metricCount * quantileCount` tasks which is 100 tasks and change in the UI case). - changing the way the indexing works, so that you can index by specific metrics for successful and failed tasks differently, would be tricky, and also would require changing the disk store version (to invalidate old stores). - maybe something else I'm not able to come up with at the moment. Unless someone has a better idea, maybe just using this code when using an in-memory store is an ok temporary "fix". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r237164158 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- I see so there's no real way to do it right without deserializing everything because that's the only way to know for sure what's successful. Leaving it as-is isn't so bad; if failed tasks are infrequent then the quantiles are still about right. I can think of fudges like searching ahead from the current index for the next successful task and using that. For the common case that might make the quantiles a little better. Iterate ahead, incrementing idx, and give up when hitting the next index. Hm @vanzin is that too complex vs taking the hit and deserializing it all? or just punt on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r237154542 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- @srowen Yes. everything is loaded in "sorted" order based on index, and then we do filtering. For In memory case, this doesn't cause any issue. but for diskStore extra de serialization overhead is there. May be one possible solution can be, for diskStore case, bring only first time and sort based on the corresponding indices to compute the quantiles. If the solution seems complicated, then we can tell the user that, summary metrics display the quantile summary of all the tasks, instead of completed. correct me if I am wrong --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r237135103 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- Pardon if I'm missing something, but does this avoid the problem? everything is loaded, and then it's filtered. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236455725 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- @vanzin Yes. It seems, loading the stage page take lot more time if we enable disk store. InMemory store seems okay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236409746 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -95,10 +123,18 @@ class AppStatusStoreSuite extends SparkFunSuite { private def newTaskData(i: Int): TaskDataWrapper = { new TaskDataWrapper( - i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None, + i, i, i, i, i, i, i.toString, i.toString, "SUCCESS", i.toString, false, Nil, None, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, stageId, attemptId) } + private def failedTaskData(i: Int): TaskDataWrapper = { --- End diff -- Thanks. done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236409661 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) --- End diff -- Thanks. I have modified --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236409557 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- Thank you @vanzin for the review. I will check the time with large number of tasks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236405732 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) --- End diff -- It doesn't make a runtime different. That's why this is a style nit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236405278 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) --- End diff -- I don't feel strongly about it but I have written it as above where a new block isn't needed. I don't think it makes any runtime difference. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236398604 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -95,10 +123,18 @@ class AppStatusStoreSuite extends SparkFunSuite { private def newTaskData(i: Int): TaskDataWrapper = { new TaskDataWrapper( - i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None, + i, i, i, i, i, i, i.toString, i.toString, "SUCCESS", i.toString, false, Nil, None, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, stageId, attemptId) } + private def failedTaskData(i: Int): TaskDataWrapper = { --- End diff -- better to have the status be a parameter to the existing method (with a default value) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236398482 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status == "SUCCESS") // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) --- End diff -- style for these is `.filter { x => ... }` (happens in a bunch of places) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236398316 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- There's a comment at the top of this method that explains why `skip` was used. It avoids deserialization of data that is not needed here, which can get quite expensive. IIRC there's about 26 metrics, and the updated code means deserializing `26 * numberOfTasks` task instances from the disk store. With large stages that can become really slow. Try creating a large stage (e.g. `sc.parallelize(1 to 10, 10).count()`), loading the resulting event log through the history server, and checking how long it takes to load the stage page the first time. The goal is to make it reasonably fast (I think it's currently in the 4-5s range). Too slow and it makes the page not very usable. If this makes that too slow, perhaps loading all the successful task into memory might be an ok workaround. It sucks (there is a spike in memory usage) but shouldn't be too bad (guessing 256 bytes per `TaskDataWrapper` object, 100k tasks ~ 24Mb which doesn't sound horrible). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236236422 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- CC @vanzin for the proposed new method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236236327 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -150,8 +150,9 @@ private[spark] class AppStatusStore( Utils.tryWithResource( store.view(classOf[TaskDataWrapper]) .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) + .index(TaskIndexNames.STATUS) + .first("SUCCESS") + .last("SUCCESS") --- End diff -- I see, thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236234523 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -150,8 +150,9 @@ private[spark] class AppStatusStore( Utils.tryWithResource( store.view(classOf[TaskDataWrapper]) .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) + .index(TaskIndexNames.STATUS) + .first("SUCCESS") + .last("SUCCESS") --- End diff -- Here the index in "status". So, it will read in a sorted order based on the "status". ie. "SUCCESS" task will be in one group, "FAILED" task be after that and so on. So, if we do first("success") to last("success"), it will count only successful tasks, not all task between the first and last successful one. Also, In the UT I have added, even indices has "success" tasks and odd indices has "failed tasks". But the count is still 3. ie. ("0", "2", "4" ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236234400 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- Thank you @srowen for the suggestion. I will try adding the filter method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236228669 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- Hm, I think this reintroduces the problem that this code fixed though. What about adding some kind of filter() method to KVStoreView? it could configure an optional `Predicate` that is applied to `iterator()`. Commons Collections has a `FilterIterator` for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236228862 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -150,8 +150,9 @@ private[spark] class AppStatusStore( Utils.tryWithResource( store.view(classOf[TaskDataWrapper]) .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) + .index(TaskIndexNames.STATUS) + .first("SUCCESS") + .last("SUCCESS") --- End diff -- Doesn't this count all tasks between the first and last successful one, rather than count just successful ones? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236120634 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- Yes. If we do, "if (status == "SUCCESS")" for every iterator value, we can't do the skip function. Becuase, earlier we know the exact index we need to take. ie. we can directly skip to 25th percentile, 50th percentile and so on. Now, we don't know which index has the 25th percentile of the "SUCCESS" value, unless we iterate each. Otherwise, we have to filter the "SUCCESS" the tasks prior, like I have done in the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236092947 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- I get it, but, it seems like there was a particular reason for using an iterator and skipping, and I'm not sure we can just undo it. If this is only to filter for "SUCCESS", that much seems easy enough in the original code with an 'if' statement somewhere, no? similar code above still uses an iterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236087784 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) + + if(quantileTasks.size > indices.length) { --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236087749 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { +val store = new InMemoryStore() +(0 until 5).foreach { i => store.write(failedTaskData(i)) } +val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) +assert(appStore.size === 0) + } + + test("summary should contain task metrics of only successfull tasks") { +val store = new InMemoryStore() +(0 until 5).foreach { i => + if (i % 2 == 1) store.write(failedTaskData(i)) + else store.write(newTaskData(i)) +} +val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + +val values = (0 to 2).map( i => --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236087731 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- Yes. That was the original intend. unfortunately after converting to the scala collection, the skip() functionality is not there. Also the kvstore doesn't have any filter API to filter the "success" tasks. The PR was for reducing the computational time for loading the stagePage from the diskStore ( for history server), by avoiding in memory sorting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236087746 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { +val store = new InMemoryStore() +(0 until 5).foreach { i => store.write(failedTaskData(i)) } +val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) +assert(appStore.size === 0) + } + + test("summary should contain task metrics of only successfull tasks") { +val store = new InMemoryStore() +(0 until 5).foreach { i => + if (i % 2 == 1) store.write(failedTaskData(i)) --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236087741 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) + + if(quantileTasks.size > indices.length) { + quantileTasks.map(task => fn(task._1).toDouble).toIndexedSeq + } else { +indices.map( index => + fn(quantileTasks.filter(_._2 == index).head._1).toDouble).toIndexedSeq --- End diff -- Modified. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236087736 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236085936 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) --- End diff -- This used to get an iterator. I wonder if it's safe to materialize the whole collection? it seems to take care to use the iterator to skip over things for example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236085919 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks --- End diff -- Why not check for "SUCCESS"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236086094 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { +val store = new InMemoryStore() +(0 until 5).foreach { i => store.write(failedTaskData(i)) } +val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) +assert(appStore.size === 0) + } + + test("summary should contain task metrics of only successfull tasks") { +val store = new InMemoryStore() +(0 until 5).foreach { i => + if (i % 2 == 1) store.write(failedTaskData(i)) + else store.write(newTaskData(i)) +} +val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + +val values = (0 to 2).map( i => --- End diff -- 2.0 is much clearer than 2.toDouble. But why not just write `Array(0.0, 2.0, 4.0)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236085876 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) + + if(quantileTasks.size > indices.length) { --- End diff -- The formatting needs adjustment around here: space after 'if', less indent on the next line, no space after (, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236086033 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -222,29 +223,20 @@ private[spark] class AppStatusStore( val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - Utils.tryWithResource( -store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => -var last = Double.NaN -var currentIdx = -1L -indices.map { idx => - if (idx == currentIdx) { -last - } else { -val diff = idx - currentIdx -currentIdx = idx -if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last -} else { - Double.NaN -} - } -}.toIndexedSeq + val quantileTasks = store.view(classOf[TaskDataWrapper]) +.parent(stageKey) +.index(index) +.first(0L) +.asScala +.filter(_.status.startsWith("S")) // Filter "SUCCESS" tasks +.zipWithIndex +.filter(x => indices.contains(x._2)) + + if(quantileTasks.size > indices.length) { + quantileTasks.map(task => fn(task._1).toDouble).toIndexedSeq + } else { +indices.map( index => + fn(quantileTasks.filter(_._2 == index).head._1).toDouble).toIndexedSeq --- End diff -- .filter + .head = .find? it is more efficient. Why not create a Map though if this needs to be accessed by key? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r236086063 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala --- @@ -77,6 +77,30 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } + test("only successfull task have taskSummary") { +val store = new InMemoryStore() +(0 until 5).foreach { i => store.write(failedTaskData(i)) } +val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) +assert(appStore.size === 0) + } + + test("summary should contain task metrics of only successfull tasks") { +val store = new InMemoryStore() +(0 until 5).foreach { i => + if (i % 2 == 1) store.write(failedTaskData(i)) --- End diff -- Braces for if-else. I also personally think a for loop is more obvious here: `for (i <- 0 until 5) {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
GitHub user shahidki31 reopened a pull request: https://github.com/apache/spark/pull/23088 [SPARK-26119][CORE][WEBUI]Task summary table should contain only successful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch: ![screenshot from 2018-11-20 00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png) ![screenshot from 2018-11-20 01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shahidki31/spark summaryMetrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23088.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23088 commit cfe2d5f3744f2d10d917883713cd78678b5157a1 Author: Shahid Date: 2018-11-19T18:23:57Z task summary should contain only successful tasks commit 8f4498e0c67f8a83401f3b0e06aef4922ef49c20 Author: Shahid Date: 2018-11-21T18:37:01Z update PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r235028381 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -238,8 +239,16 @@ private[spark] class AppStatusStore( val diff = idx - currentIdx --- End diff -- Hi @gengliangwang I will update the PR. Actually there are more changes required to fix this. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/23088#discussion_r235027227 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -238,8 +239,16 @@ private[spark] class AppStatusStore( val diff = idx - currentIdx --- End diff -- `diff` could be negative here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
Github user shahidki31 closed the pull request at: https://github.com/apache/spark/pull/23088 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23088: [SPARK-26119][CORE][WEBUI]Task summary table shou...
GitHub user shahidki31 opened a pull request: https://github.com/apache/spark/pull/23088 [SPARK-26119][CORE][WEBUI]Task summary table should contain only successful tasks' metrics ## What changes were proposed in this pull request? Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark. ## How was this patch tested? Added UT. attached screenshot Before patch: ![screenshot from 2018-11-20 00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png) ![screenshot from 2018-11-20 01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shahidki31/spark summaryMetrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23088.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23088 commit cfe2d5f3744f2d10d917883713cd78678b5157a1 Author: Shahid Date: 2018-11-19T18:23:57Z task summary should contain only successful tasks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org