[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 Thanks a lot for comments. I refined accordingly : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @mridulm Thanks a lot for comments. I refined accordingly. (btw time complexity of the `rebalance` in `MedianHeap`is O(1)). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @mridulm Thanks a lot for your comments. I did a test with `TreeSet` previously with 100k tasks. I calculate the time spent on insertion. The results are: 372ms, 362ms, 458ms, 429ms, 363ms, which I think might be acceptable, though the time complexity is O(log n). It's great if you can give more advice and I'm glad to try more ideas and do measurements :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r104344524 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -754,7 +743,6 @@ private[spark] class TaskSetManager( } removeRunningTask(tid) info.markFinished(state) -successfulTaskIdsSet -= tid --- End diff -- Removing duration from `successfulTaskDurations` is quite time consuming(O(n)) now. We just use `successfulTaskDurations` to generate the median duration. I might hesitate to do the remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r104344529 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -754,7 +743,6 @@ private[spark] class TaskSetManager( } removeRunningTask(tid) info.markFinished(state) -successfulTaskIdsSet -= tid --- End diff -- Removing duration from `successfulTaskDurations` is quite time consuming(O(n)) now. We just use `successfulTaskDurations` to generate the median duration. I might hesitate to do the remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r104344274 --- Diff: core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable + +/** + * MedianHeap stores numbers and returns the median by O(1) time complexity. + * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The maxHeap stores + * the smaller half of all numbers while the minHeap stores the larger half. The sizes + * of two heaps need to be balanced each time when a new number is inserted so that their + * sizes will not be different by more than 1. Therefore each time when findMedian() is + * called we check if two heaps have the same size. If they do, we should return the + * average of the two top values of heaps. Otherwise we return the top of the heap which + * has one more element. + */ + +private[spark] +class MedianHeap(implicit val ord: Ordering[Double]) { + + // Stores all the numbers less than the current median in a maxHeap, + // i.e median is the maximum, at the root + val maxHeap = mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]]) + + // Stores all the numbers greater than the current median in a minHeap, + // i.e median is the minimum, at the root + val minHeap = mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse) + + // Returns if there is no element in MedianHeap. + def isEmpty(): Boolean = { +maxHeap.isEmpty && minHeap.isEmpty + } + + // Size of MedianHeap. + def size(): Int = { +maxHeap.size + minHeap.size + } + + // Insert a new number into MedianHeap. + def insert(x: Double): Unit = { +// If both heaps are empty, we arbitrarily insert it into a heap, let's say, the minHeap. +if (isEmpty) { + minHeap.enqueue(x) +} else { + // If the number is larger than current median, it should be inserted into minHeap, + // otherwise maxHeap. + if (x > findMedian) { +minHeap.enqueue(x) + } else { +maxHeap.enqueue(x) + } +} +rebalance() + } + + // Re-balance the heaps. + private[this] def rebalance(): Unit = { +if (minHeap.size - maxHeap.size > 1) { + maxHeap.enqueue(minHeap.dequeue()) +} +if (maxHeap.size - minHeap.size > 1) { + minHeap.enqueue(maxHeap.dequeue) +} + } + + // Returns the median of the numbers. + def findMedian(): Double = { +if (isEmpty) { + throw new NoSuchElementException("MedianHeap is empty.") +} +if (minHeap.size == maxHeap.size) { + (minHeap.head + maxHeap.head) / 2.0 --- End diff -- I think average value is the definition of 'median' when there are even numbers. Maybe it's better to keep it as `(minHeap.head + maxHeap.head) / 2.0` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r104344072 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +var valid = false +try { + medianHeap.findMedian() +} catch { + case e: NoSuchElementException => +valid = true +} + +assert(valid) + } + + test("Median should be correct when size of MedianHeap is ord or even") { +val random = new Random() +val medianHeap1 = new MedianHeap() +val array1 = new Array[Int](100) +(0 until 100).foreach { + case i => +val randomNumber = random.nextInt(1000) +medianHeap1.insert(randomNumber) +array1(i) += randomNumber +} +util.Arrays.sort(array1) +assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0)) + +val medianHeap2 = new MedianHeap() +val array2 = new Array[Int](101) +(0 until 101).foreach { + case i => +val randomNumber = random.nextInt(1000) +medianHeap2.insert(randomNumber) +array2(i) += randomNumber +} +util.Arrays.sort(array2) +assert(medianHeap2.findMedian() === array2(50)) + } + + test("Size of Median should be correct though there are duplicated numbers inside.") { --- End diff -- random.nextInt(100) returns value between 0(inclusive) and the specified value(exclusive). If I call it 1000 times, there must be duplicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @squito Yes, some of machine learning jobs which do cartesian product in my cluster have over than 100k tasks in the `TaskSetManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout @squito Thanks a lot for your comments, really helpful :) I really think median heap is a good idea. `slice` is `O(n)` and is not most efficient. I'm doing implementation for `MedianHeap`, which provides `add`, `remove` and `getMedian` methods. Yes, for both `TreeSet` and `MedianHeap`, when `handleSuccessfulTask`, we are doing insertion with `O(log n)`, I did a test(https://github.com/apache/spark/pull/17112) to measure the *sum of time* cost by `TreeSet.add`. There are 100k tasks in this test. Results are: 372ms, 362ms, 458ms, 429ms, 363ms. Thus I think the `O(log n)` time complexity will not be a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17133#discussion_r104273530 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala --- @@ -75,6 +75,8 @@ class TaskInfo( } private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) { --- End diff -- Yes, good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17133#discussion_r104161512 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala --- @@ -75,6 +75,8 @@ class TaskInfo( } private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) { +// finishTime should be set larger than 0, otherwise "finished" below will return false. +assert(time != 0) --- End diff -- Yes, I want to assert that `time>0` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/17133#discussion_r104066996 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -695,7 +695,8 @@ private[spark] class TaskSetManager( def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index -info.markFinished(TaskState.FINISHED) +// Mark task as finished. Note that finishing time here should be bigger than 0. --- End diff -- Thanks for review :) what I want to say is that `finishTime` of `TaskInfo` should be set larger than 0, otherwise `TaskInfo.finishTime` will return false. I removed the comments and added an `assert` inside `TaskInfo`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17133: [SPARK-19793] Use clock.getTimeMillis when mark task as ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17133 I found this when do https://github.com/apache/spark/pull/17112, which is for measuring the approach I proposed in https://github.com/apache/spark/pull/16867. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17133 [SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager. ## What changes were proposed in this pull request? TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19793 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17133 commit 56cd922e562ac1aa7a97df01cc16f38a8b3bf250 Author: jinxing Date: 2017-03-02T09:33:53Z [SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17111: [SPARK-19777] Scan runningTasksSet when check speculatab...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17111 @kayousterhout Thanks for merging. (btw, I made some measurements for https://github.com/apache/spark/pull/16867 SPARK-16929, please take a look when you have time :) ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 I added a measurement for this pr in #17112 . Results are as below, newAlgorithm indicates whether we use `TreeSet` to get the median duration or not. And `time cost` is the time used when get the median duration of the successful tasks. I did the measurement several times. `tasksNum` indicates how many tasks in the `TaskSetManager`. If `tasksNum=1000`: | newAlgorithm | time cost | | --| -- | | false | 5ms, 3ms, 4ms, 3ms, 3ms | | true | 2ms, 4ms, 2ms, 2ms, 3ms | if `tasksNum=10`: | newAlgorithm | time cost | | --| -- | | false | 107ms, 109ms, 103ms, 100ms, 107ms | | true | 17ms, 14ms, 14ms, 13ms, 14ms | if `tasksNum=15`: | newAlgorithm | time cost | | --| -- | | false | 133ms, 146ms, 127ms, 163ms, 114ms | | true | 14ms, 13ms, 15ms, 16ms, 14ms | As we can see, new algorithm(TreeSet) has better performance than old algorithm(Arrays.sort). When tasksNum=10, Arrays.sort costs over 100ms every time, while in new algorithm all below 20ms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17112: Measurement for SPARK-16929.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17112 The unit test "Measurement for SPARK-16929." added is the measurement. In TaskSetManagerSuite.scala line 1049, if `newAlgorithm=true`, `successfulTaskIdsSet `will be used to get the median duration. If `newAlgorithm=false`, old algorithm(`Arrays.sort`) will be used. I calculate the time used for getting median duration in TaskSetManager.scala line 957. If `tasksNum=1000`(TaskSetManagerSuite.scala line 1043), measurement is as below: | newAlgorithm | time cost | | --| -- | | false | 5ms, 3ms, 4ms, 3ms, 3ms | | true | 2ms, 4ms, 2ms, 2ms, 3ms | | newAlgorithm | time cost | | --| -- | | false | 107ms, 109ms, 103ms, 100ms, 107ms | | true | 17ms, 14ms, 14ms, 13ms, 14ms | if `tasksNum=15`: | newAlgorithm | time cost | | --| -- | | false | 133ms, 146ms, 127ms, 163ms, 114ms | | true | 14ms, 13ms, 15ms, 16ms, 14ms | As we can see, new algorithm(`TreeSet`) has better performance than old algorithm(`Arrays.sort`). When `tasksNum`=10, `Arrays.sort` costs over 100ms every time, while in new algorithm all below 20ms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17112: Measurement for SPARK-16929.
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17112 Measurement for SPARK-16929. ## What changes were proposed in this pull request? This pr doesn't target for merging. It's a measurement for https://github.com/apache/spark/pull/16867, in which store successful taskIds in `successfulTaskIdsSet` in `TreeSet`, thus the time complexity is O(n/2) when get median duration in `checkSpeculatableTasks`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-16929-measurement Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17112.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17112 commit 6825bd741933df9bc25b1dd87080b3edfd2a3e30 Author: jinxing Date: 2017-03-01T02:50:19Z Measurement for SPARK-16929. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17111: [SPARK-19777] Scan runningTasksSet when check speculatab...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17111 @squito Thanks a lot :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout @squito It's great to open a new jira for this change. Please take a look at https://github.com/apache/spark/pull/17111. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17111: [SPARK-19777] Scan runningTasksSet when check speculatab...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/17111 cc @kayousterhout @squito --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17111: [SPARK-19777] Scan runningTasksSet when check spe...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/17111 [SPARK-19777] Scan runningTasksSet when check speculatable tasks in T⦠â¦askSetManager. ## What changes were proposed in this pull request? When check speculatable tasks in `TaskSetManager`, only scan `runningTasksSet` instead of scanning all `taskInfos`. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19777 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17111 commit 819a5e8f5f44bb41e3e401b3b68689509184e580 Author: jinxing Date: 2017-02-28T07:40:57Z [SPARK-19777] Scan runningTasksSet when check speculatable tasks in TaskSetManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r103391138 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -911,14 +916,14 @@ private[spark] class TaskSetManager( logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray - Arrays.sort(durations) + val durations = successfulTasksSet.toArray.map(taskInfos(_).duration) val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) - for ((tid, info) <- taskInfos) { + for (tid <- runningTasksSet) { +val info = taskInfos(tid) --- End diff -- @kayousterhout Thanks a lot for your comments :) I will keep this simple change in this pr. For time complexity improvement, I will make another pr and try add some measurements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [WIP][SPARK-19659] Fetch big blocks to disk when shuffle...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 @squito I've uploaded a design doc to jira, please take a look when you have time :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [WIP][SPARK-16929] Improve performance when check specul...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @squito Thanks a lot for your comments : ) >When check speculatable tasks in TaskSetManager, current code scan all task infos and sort durations of successful tasks in O(NlogN) time complexity. `checkSpeculatableTasks` is scheduled every 100ms by `scheduleAtFixedRate`(not `scheduleWithFixedDelay `), thus the interval can be less than 100ms. In my cluster(yarn-cluster mode), if size of the task set is over 30 and the driver is running on some machine with poor cpu performance, the `Arrays.sort` can take over than 100ms easily. Since `checkSpeculatableTasks` will synchronize `TaskSchedulerImpl`, I suspect that's why my driver hang. I get median duration by `TreeSet.slice`, which comes from `IterableLike` and cannot jump to the mid position unluckily. The time complexity is O(n) in this pr. I can get the mid position by reflection, but I don't want to do that, I think that is harmful for code clarity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 @squito Thanks a lot for your comments : ) Yes, There must be a design doc for discussing. I will prepare and post a pdf to jira. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/16901 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16901 @kayousterhout I'll close since this functionality is already tested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16867 @kayousterhout @squito Would you mind to take a look at this when have time ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16790: [SPARK-19450] Replace askWithRetry with askSync.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16790 @srowen @vanzin Thanks a lot for the work on this ~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16989 @vanzin @squito Would you mind to take a look at this when have time ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16989 [SPARK-19659] Fetch big blocks to disk when shuffle-read. ## What changes were proposed in this pull request? Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse. Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used), It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM. Fetching big blocks to disk is mentioned in SPARK-3019. In this pr, when sendRequest in `ShuffleBlockFetcherIterator`, check if `bytesInFlight` is over `maxBytesInFlight`, if so, fetch remote blocks to disk by sending `StreamRequest`. By setting `spark.reducer.maxSizeInFlight` properly, user can avoid OOM without causing performance degradation. ## How was this patch tested? Not added yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19659 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16989.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16989 commit 21f6da36b127956bf35da088f1ecfeb55b307f3e Author: jinxing Date: 2017-02-18T15:00:50Z [SPARK-19659] Fetch big blocks to disk when shuffle-read. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16790: [SPARK-19450] Replace askWithRetry with askSync.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16790 Both `askSync` and `askWithRetry` are blocking, the only difference is the "retry"(default is 3 times) when the rpc is failed. Callers of this method do not necessarily rely on this "retry". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16790: [SPARK-19450] Replace askWithRetry with askSync.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16790 https://github.com/apache/spark/pull/16690#discussion_r101616883 causes the build to produce lots of deprecation warnings. @srowen @vanzin How do you think about this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16690 @srowen How do you think about https://github.com/apache/spark/pull/16790? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout @squito @markhamstra Thanks for all of your work for this patch. Really appreciate your help : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 Yes, refined : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito Thanks a lot. I've refined the comment, please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16901 @squito Thanks a lot for your comments. I've refined the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100968529 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) +complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), + (Success, makeMapStatus("hostB", 2 + +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1), there's no pending --- End diff -- Yes, the success should be moved. Sorry for this and I'll rectify. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16901 @kayousterhout I've refined accordingly. Sorry for the stupid mistake I made. Please take another look at this : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout I've refined accordingly, please take another look : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r100953546 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + +" even with late completions from earlier stage attempts") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA. +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + null)) + +scheduler.resubmitFailedStages() + +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +// Task succeeds on a failed executor. The success is bogus. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) + +assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// There should be no new attempt of stage submitted. +assert(taskSets.size === 4) --- End diff -- Yes, I think so : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout Thanks a lot for the clear explanation. It makes great sense to me and help me understand the logic a lot. Also I think the way of testing is very good and make the code very clear. I've already refined this pr, please take a look when tests pass. Also with understanding of your explanation above in >Scenario A (performance optimization, as discussed here already): This happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, it encountered a fetch failure, so the previous stage needed to be re-run to generate the missing output). ... I made #16901 to add a test that success of old attempt should be taken as valid and corresponding pending partition should be removed. Please give a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16901 @kayousterhout @squito @markhamstra As mentioned in #16620 , I think it might make sense to make this pr. Please take a look. If you think it is too trivial, I will close. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16901 [SPARK-19565] Improve DAGScheduler tests. ## What changes were proposed in this pull request? This is related to #16620. When fetch failed, stage will be resubmitted. There can be running tasks from both old and new stage attempts. This pr added a test to check the case that success of tasks from old stage attempt should be taken as valid and partitionId should be removed from stage's pendingPartitions accordingly. When pending partitions is empty, downstream stage can be scheduled, even though there's still running tasks in the active(new) stage attempt. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16901.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16901 commit f5df21405f1eb31c7f32ffd7b32ed02abbc6d033 Author: jinxing Date: 2017-02-12T12:39:53Z [SPARK-19565] Improve DAGScheduler tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16876 @kayousterhout It's great to give a definition of `pendingPartitions` in `ShuffleMapStage`. May I ask a question and make my understanding about `pendingPartitions` clear ? It means: 1. PartitionId gets removed from `pendingPartitions` because of real success(not including ones from failed executors); 2. `pendingPartitions` is the exact mirror(in reverse) of `ShuffleMapStage` 's output locations. Thus when `pendingPartition` is empty, all output locations are available; 3. When executor lost, we should check if there is need to add partitionId back to `pendingPartitions` accordingly. 4. No need to consider resubmit stage when `pendingPartitions` is empty. Is my understanding correct ? It'd great if you can help rectify my wrong points. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16831 @kayousterhout Thanks a lot. Sorry for this and I'll be careful in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16876 It's great to have pendingPartitions in ShuffleMapStage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16867 [SPARK-16929] Improve performance when check speculatable tasks. ## What changes were proposed in this pull request? When check speculatable tasks in `TaskSetManager`, current code scan all task infos and sort durations of successful tasks in O(NlogN) time complexity. Since during the checkin g process, `TaskSchedulerImpl`'s synchronized lock is acquired, so it might cause performance degradation when check a large scale task set, say hundreds of thousands. This change uses a `TreeSet` to cache the successful task infos and compare the median duration with running tasks, avoiding scanning all task infos. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-16929 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16867.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16867 commit 1169d118662a9bfdabe88238352fe834a28aee14 Author: jinxing Date: 2017-02-07T02:35:10Z [SPARK-16929] Improve performance when check speculatable tasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16831 @squito Many thanks for your help. You are so kind person : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16831 @kayousterhout thanks a lot. I'm not sure how to start the unit test automatically, do I have the right to do that? BTW, may I ask a question, what is the proper way to run the unit test in my local? Currently, I run a single unit test by `build/mvn test -DwildcardSuites=org.apache.spark.**Suite` and run all unit tests by ` build/mvn -Pyarn -Phadoop-2.6 -Phive-thriftserver -Dhadoop.version=2.6.5 test`. Is it correct? It'd great if you can help me out or give a wiki that I can refer :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @markhamstra @squito @kayousterhout It would be great if you can give more comments about above and I can continue working on this : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 As @squito mentioned: >Before this, the DAGScheduler didn't really know anything about taskSetManagers. (In its current form, this pr uses a "leaked" handle via rootPool.getSortedTaskSetQueue). Is adding it here a mistake? An alternative would be to add a method to TaskScheduler like markTaskSetsForStageAsZombie(stageId: Int). But that is still basically exposing the idea of "zombie" tasksets to the dagscheduler, I dunno if its actually any cleaner. I think this a cleaner and simpler way for fixing this bug. And we can avoid adding TSM info to the DAGScheduler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout @squito @markhamstra Thanks a lot for reviewing this pr thus far. I do think the approach, which throws away task results from earlier attempts that were running on executors that failed and take `stage.pendingPartitions` as an exact mirror(in reverse) of the output locations for the state, can really fix this bug and make the code quite clear. But the understanding I have previously about `stage.pendingPartitions` is a little bit different, as commented in `Stage` as below: ``` /** * Partitions the [[DAGScheduler]] is waiting on before it tries to mark the stage / job as * completed and continue. Tasks' successes in both the active taskset or earlier attempts * for this stage can cause partition ids get removed from pendingPartitions. Finally, note * that when this is empty, it does not necessarily mean that stage is completed -- Some of * the map output from that stage may have been lost. But the [[DAGScheduler]] will check for * this condition and resubmit the stage if necessary. */ ``` All tasks' success can result in partition get removed `pendingPartitions`, no matter it is from a valid executor or a failed one. Thus when the `pendingPartitions` becomes empty, we can check if the stage's output locations are all available, if not we resubmit. If we take `stage.pendingPartitions` as an exact mirror(in reverse) of the output locations. Some unit tests can not pass in DAGSchedulerSuite(e.g. `("run trivial shuffle with out-of-band failure and retry"`). Think about below: 1. A stage have ShuffleMapTask1 and ShuffleMapTask2, `pendingPartitions`=(0, 1) 2. ShuffleMapTask1 succeeded on executorA and returned to driver, pendingPartitions=(1) 3. ShuffleMapTask2 succeeded on executorA; 4. Driver heard executorA is lost; 5. ShuffleMapTask2's success returned to driver, still `pendingPartitions`=(1) and the stage cannot get rescheduled. In my understanding, `pendingPartitions` helps us to track running of `TaskSetManager` and know if there is still tasks coming on the way and deserve waiting, and decide when to check if the output locations are all available and whether to resubmit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16831 @kayousterhout Thanks a lot for review. I've already refined. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16831 @squito Thanks a lot for review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16831 @kayousterhout @squito This is originally raised by @squito when review https://github.com/apache/spark/pull/16620. Sorry for my eager to make this small pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16831: [SPARK-19263] Fix race in SchedulerIntegrationSui...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16831 [SPARK-19263] Fix race in SchedulerIntegrationSuite. ## What changes were proposed in this pull request? All the process of offering resource and generating `TaskDescription` should be guarded by taskScheduler.synchronized in `reviveOffers`, otherwise there is race condition. ## How was this patch tested? Existing unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19263-FixRaceInTest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16831.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16831 commit 05ce9446d386b2ad7827c2ca9f1cb38bf343e9a7 Author: jinxing Date: 2017-02-07T11:25:31Z [SPARK-19263] fix race in SchedulerIntegrationSuite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout @squito @markhamstra Thanks a lot for for the comments. I've already refined accordingly. I still have one concern: > If this is a correct description, Iâd argue that (5) is the problem: that when ShuffleMapTask2 finishes, we should not be updating a bunch of state in the DAGScheduler saying that thereâs output ready as a result. If Iâm understanding correctly, thereâs a relatively simple fix to this problem: In DAGScheduler.scala, in handleTaskCompletion, we should exit (and not update any state) when the task is from an earlier stage attempt thatâs not the current active attempt. This can be done by changing the if-statement on line 1141 to include: || stageIdToStage(task.stageId).latestInfo.attemptId != task.stageAttemptId With above, are we ignoring all the results from old stage attempts? As @squito mentioned: > It also can potentially improve performance, since you may submit downstream stages more quickly, rather than waiting for all tasks in the active taskset to complete. Is it maybe beneficial to add up the result from old stage attempts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 @kayousterhout Thanks a lot again : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 @kayousterhout Thanks a lot for helping this pr thus far. I think the proposal is quite clear. I've already refined. Please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito Thanks a lot for helping this PR thus far. I've added unit test in `DAGSchedulerSuite`, but not sure if it is exactly what you suggest. I created a `mockTaskSchedulerImpl`. Since lots of status are maintained in `TaskScheudlerImpl`, I have to trigger the event by `resourceOffers`, `handleSuccessfulTask`, `handleFailedTask`. Please give another look at this when you have time. Really appreciate if you could help. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 Thanks a lot for the comments. Actually Im still not sure how to change this log or even just remove it. I just think the log is confusing. It is printed out every FetchFailed. Please give some suggestions if possible. If this is too trival to fix, should I close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16808: [SPARK-19461] Remove some unused imports.
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/16808 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16808: [SPARK-19461] Remove some unused imports.
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16808 [SPARK-19461] Remove some unused imports. ## What changes were proposed in this pull request? Remove some unused imports in `CoarseGrainedSchedulerBackend` and `YarnSchedulerBackend`. ## How was this patch tested? No need to test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19461 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16808.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16808 commit 0db96a04b594f816edcddd2bd6e7e9a6e9d6c3a7 Author: jinxing Date: 2017-02-05T09:53:18Z [SPARK-19461] Remove some unused imports. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 I just changed the log message, but not sure if it clear enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16807: [SPARK-19398] Change one misleading log in TaskSe...
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/16807 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16738: [SPARK-19398] Change one misleading log in TaskSe...
GitHub user jinxing64 reopened a pull request: https://github.com/apache/spark/pull/16738 [SPARK-19398] Change one misleading log in TaskSetManager. ## What changes were proposed in this pull request? Log below is misleading: ``` if (successful(index)) { logInfo( s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + "but another instance of the task has already succeeded, " + "so not re-queuing the task to be re-executed.") } ``` If fetch failed, the task is marked as successful in `TaskSetManager:: handleFailedTask`. Then log above will be printed. The `successful` just means task will not be scheduled any longer, not a real success. ## How was this patch tested? Existing unit tests can cover this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16738.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16738 commit 6ccd9f5cb6c03bad6a7a01104a95c277218ffa2d Author: jinxing Date: 2017-02-05T03:46:35Z [SPARK-19398] Change one misleading log in TaskSetManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16807: [SPARK-19398] Change one misleading log in TaskSe...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16807 [SPARK-19398] Change one misleading log in TaskSetManager. ## What changes were proposed in this pull request? Log below is misleading: ``` if (successful(index)) { logInfo( s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + "but another instance of the task has already succeeded, " + "so not re-queuing the task to be re-executed.") } ``` If fetch failed, the task is marked as successful in `TaskSetManager:: handleFailedTask`. Then log above will be printed. The `successful` just means task will not be scheduled any longer, not a real success. ## How was this patch tested? Existing unit tests can cover this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16807 commit 6ccd9f5cb6c03bad6a7a01104a95c277218ffa2d Author: jinxing Date: 2017-02-05T03:46:35Z [SPARK-19398] Change one misleading log in TaskSetManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16738: [SPARK-19398] Change one misleading log in TaskSe...
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/16738 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16790: [SPARK-19450] Replace askWithRetry with askSync.
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16790 [SPARK-19450] Replace askWithRetry with askSync. ## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and https://github.com/apache/spark/pull/16690#issuecomment-276850068) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19450 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16790 commit b575c55ed9901b4dcf5b3e977a55f135b02aa4b1 Author: jinxing Date: 2017-02-03T17:29:05Z [SPARK-19450] Replace askWithRetry with askSync. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 @kayousterhout Would you please give a look at this ? It's great if you could help review this : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito Would you please take another look at this? Please give some advice if possible and I can continue working on this : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16779: [SPARK-19437] Rectify spark executor id in HeartbeatRece...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16779 Thanks a lot for reviewing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16779: [SPARK-19437] Rectify spark executor id in HeartbeatRece...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16779 @zsxwing Thanks a lot for reviewing this. Not sure why the test doesn't start automatically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16780: [SPARK-19438] Both reading and updating executorD...
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/16780 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16780: [SPARK-19438] Both reading and updating executorDataMap ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16780 Thanks a lot for looking into this~ @zsxwing You are right. My understanding about this is incorrect. `CoarseGrainedSchedulerBackend: DriverEndpoint` is a `ThreadSafeRpcEndpoint`, thus concurrent message processing is disabled. I'll close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 @srowen Thanks a lot. I'll refine : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16780: [SPARK-19438] Both reading and updating executorD...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16780 [SPARK-19438] Both reading and updating executorDataMap should be guarded by CoarseGrainedSchedulerBackend.this.synchronized when handle RegisterExecutor. ## What changes were proposed in this pull request? Currently when handle `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, `executorDataMap` is guarded by `CoarseGrainedSchedulerBackend.this.synchronized` when updating, which can cause `numPendingExecutors` incorrect. Code is like below: ``` if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { ... CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } ``` Consider SPARK-19437 and a scenario like below: An executor sent `RegisterExecutor` twice by `askWithRetry`, and the interval between the two is quite small. Thus it might be possible that both of them will go to `else` branch, thus `numPendingExecutors` will be deducted twice and become incorrect. Currently, the `askWithRetry` of `RegisterExecutor` only exists in some unit tests, but it makes sense to make it stronger when handling `RegisterExecutor`. **TO FIX** Use CoarseGrainedSchedulerBackend.this.synchronized to guard executorDataMap when both reading and updating. ## How was this patch tested? The code logic is not changed, just manually tested. SPARK-19437 can also help verify this fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19438 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16780.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16780 commit 7bb3a438eff8c8ab9567a50ca59da2e179939e76 Author: jinxing Date: 2017-02-02T15:54:08Z [SPARK-19438] Both reading and updating executorDataMap should be guarded by CoarseGrainedSchedulerBackend.this.synchronized when handle RegisterExecutor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16779: [SPARK-19437] Rectify spark executor id in Heartb...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16779 [SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite. ## What changes were proposed in this pull request? The current code in `HeartbeatReceiverSuite`, executorId is set as below: ``` private val executorId1 = "executor-1" private val executorId2 = "executor-2" ``` The executorId is sent to driver when register as below: ``` test("expire dead hosts should kill executors with replacement (SPARK-8119)") { ... fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) ... } ``` Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below: ``` case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { ... executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... ``` `executorId.toInt` will cause NumberformatException. This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true. **To fix** Rectify executorId and replace `askWithRetry` with `askSync`, refer to https://github.com/apache/spark/pull/16690 ## How was this patch tested? This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this) You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19437 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16779.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16779 commit c8403eafbf11837e315a5e5732d7e2e4ccdc673d Author: jinxing Date: 2017-02-02T14:14:35Z [SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16690 Thanks a lot for reviewing this PR~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 @srowen @jasonmoore2k Thanks a lot for reviewing this PR~ >Should successful and tasksSuccessful renamed to be completed and tasksCompleted? How do you think about above and remove the log. ``` if (successful(index)) { logInfo( s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + "but another instance of the task has already succeeded, " + "so not re-queuing the task to be re-executed.") } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16690 @vanzin Thanks a lot for helping this PR~ I've already refined~ Please take another look~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito Thanks a lot for keep reviewing this~ Your comments are very helpful ~ Thank you so much for your help ~~ -when we encounter the condition where there are no pending partitions, but there is an active taskset -- we just mark that taskset as inactive It's good idea, which makes the code quite clear. I've already modified, please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r98819685 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1212,8 +1223,9 @@ class DAGScheduler( clearCacheLocs() - if (!shuffleStage.isAvailable) { -// Some tasks had failed; let's resubmit this shuffleStage + if (!shuffleStage.isAvailable && noActiveTaskSetManager) { --- End diff -- Hrmm... yes, @squito , we shouldn't go to else branch when the shuffleStage is not available but active `TaskSetManager` exists. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16690 I feel very sorry if this is disturbing : ) @vanzin Thanks a lot for continuing reviewing this and I'll be more patient : ) Sorry again~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r98488916 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -718,6 +703,21 @@ private[spark] class TaskSetManager( " because task " + index + " has already completed successfully") } maybeFinishTaskSet() +// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the +// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not +// "deserialize" the value when holding a lock to avoid blocking other threads. So we call +// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. +// Note: "result.value()" only deserializes the value when it's called at the first time, so +// here "result.value()" just returns the value and won't block other threads. +sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) +// Kill any other attempts for the same task (since those are unnecessary now that one +// attempt completed successfully). +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") + sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) +} --- End diff -- @squito Yes, it make sense to move this part before `maybeFinishTaskSet()`, I will refine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito ping for review~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16690 @vanzin ping for review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16738 Should `successful` and `tasksSuccessful` renamed to be `completed` and `tasksCompleted`?which I think make more sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16738: [SPARK-19398] remove one misleading log in TaskSe...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16738 [SPARK-19398] remove one misleading log in TaskSetManager. ## What changes were proposed in this pull request? Log below is misleading: ``` if (successful(index)) { logInfo( s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + "but another instance of the task has already succeeded, " + "so not re-queuing the task to be re-executed.") } ``` If fetch failed, the task is marked as successful in TaskSetManager:: handleFailedTask. Then log above will be printed. The successful just means task will not be scheduled any longer, not a real success. ## How was this patch tested? Existing unit tests can cover this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16738.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16738 commit 992bd3ce21d8c7f606307e4d99d236b7de2d82fe Author: jinxing Date: 2017-01-30T05:02:18Z [SPARK-19398] remove one misleading log in TaskSetManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16690 @vanzin @zsxwing ping for review~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito Could you please take another look at this ? : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r98043010 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -718,6 +703,21 @@ private[spark] class TaskSetManager( " because task " + index + " has already completed successfully") } maybeFinishTaskSet() +// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the +// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not +// "deserialize" the value when holding a lock to avoid blocking other threads. So we call +// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. +// Note: "result.value()" only deserializes the value when it's called at the first time, so +// here "result.value()" just returns the value and won't block other threads. +sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) +// Kill any other attempts for the same task (since those are unnecessary now that one +// attempt completed successfully). +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") + sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) +} --- End diff -- The `Success` is handled in `DAGScheduler` in a different thread. `DAGScheduler` perhaps needs to check `tasksetManager's` status, e.g. `isZombie`. Move the code here, thus the checking is safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 Fail to pass unit test. I will keep working on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 >hmm, this is a nuisance. I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in DAGSchedulerSuite? it seems like this can be entirely contained to the DAGScheduler and doesn't require tricky interactions with other parts of the scheduler. (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of DAGSchedlerSuite but there was some reason you couldn't.) `DAGSchedulerSuite` is quite hard for me. Because this bug happens during the interreaction between `DAGScheduler` and `TaksSchedulerImpl`, actually the conflicting exception is thrown in `TaskSchedulerImpl` when `submitTasks` is called from `DAGScheduler`. `DAGSchedulerSuite` only provides a very simple `TaskScheduler`, of course I can check the conflicting in it but I don't think it is convincing enough. I don't like the `Thread.sleep(5000)` either. But I didn't find a better way. I'm sorry to add `TestDAGScheduler` in `SchedulerIntegrationSuite` just like `TestTaskScheduler` for tracking more state. But perhaps it can also be used in the future. If it is not preferred, I'm so sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @squito Thanks a lot for your comments, they are very helpful. I've already refined the code, please take another look : ) When handle `Success` of `ShuffleMapTask`, what I want to do is to check whether there is some tasks running for some other partitions, if so, do not resubmit if `pendingPartitions.isEmpty && !stage.isAvailable`. there are two benefits for this: 1. Success of the running tasks have chance to update mapstatus to `ShuffleMapStage`, and turn it to be available; 2. Avoid submitting conflicting `taskSet` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16690 [SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry. ## What changes were proposed in this pull request? `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times. *To reproduce*: 1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent. 2. Rebuild Spark and run following job: ``` def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() } ``` To fix: It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it. ## How was this patch tested? Test manually. The scenario described above doesn't happen with this patch. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19347 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16690.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16690 commit c5bcccf227446f5d044f8fb0518caa12cfef7421 Author: jinxing Date: 2017-01-24T09:33:23Z [SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/16620 @markhamstra Thanks a lot for your comment, I've already refined, please take another look ~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org