[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-06 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
Thanks a lot for comments. I refined accordingly : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-06 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@mridulm 
Thanks a lot for comments. I refined accordingly. (btw time complexity of 
the `rebalance` in `MedianHeap`is O(1)).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@mridulm 
Thanks a lot for your comments. I did a test with `TreeSet` previously with 
100k tasks. I calculate the time spent on insertion. The results are:  372ms, 
362ms, 458ms, 429ms, 363ms, which I think might be acceptable, though the time 
complexity is O(log n).
It's great if you can give more advice and I'm glad to try more ideas and 
do measurements :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344524
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -754,7 +743,6 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
-successfulTaskIdsSet -= tid
--- End diff --

Removing duration from `successfulTaskDurations` is quite time 
consuming(O(n)) now. We just use `successfulTaskDurations` to generate the 
median duration. I might hesitate to do the remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344529
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -754,7 +743,6 @@ private[spark] class TaskSetManager(
 }
 removeRunningTask(tid)
 info.markFinished(state)
-successfulTaskIdsSet -= tid
--- End diff --

Removing duration from `successfulTaskDurations` is quite time 
consuming(O(n)) now. We just use `successfulTaskDurations` to generate the 
median duration. I might hesitate to do the remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344274
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+/**
+ * MedianHeap stores numbers and returns the median by O(1) time 
complexity.
+ * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The 
maxHeap stores
+ * the smaller half of all numbers while the minHeap stores the larger 
half.  The sizes
+ * of two heaps need to be balanced each time when a new number is 
inserted so that their
+ * sizes will not be different by more than 1. Therefore each time when 
findMedian() is
+ * called we check if two heaps have the same size. If they do, we should 
return the
+ * average of the two top values of heaps. Otherwise we return the top of 
the heap which
+ * has one more element.
+ */
+
+private[spark]
+class MedianHeap(implicit val ord: Ordering[Double]) {
+
+  // Stores all the numbers less than the current median in a maxHeap,
+  // i.e median is the maximum, at the root
+  val maxHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]])
+
+  // Stores all the numbers greater than the current median in a minHeap,
+  // i.e median is the minimum, at the root
+  val minHeap = 
mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]].reverse)
+
+  // Returns if there is no element in MedianHeap.
+  def isEmpty(): Boolean = {
+maxHeap.isEmpty && minHeap.isEmpty
+  }
+
+  // Size of MedianHeap.
+  def size(): Int = {
+maxHeap.size + minHeap.size
+  }
+
+  // Insert a new number into MedianHeap.
+  def insert(x: Double): Unit = {
+// If both heaps are empty, we arbitrarily insert it into a heap, 
let's say, the minHeap.
+if (isEmpty) {
+  minHeap.enqueue(x)
+} else {
+  // If the number is larger than current median, it should be 
inserted into minHeap,
+  // otherwise maxHeap.
+  if (x > findMedian) {
+minHeap.enqueue(x)
+  } else {
+maxHeap.enqueue(x)
+  }
+}
+rebalance()
+  }
+
+  // Re-balance the heaps.
+  private[this] def rebalance(): Unit = {
+if (minHeap.size - maxHeap.size > 1) {
+  maxHeap.enqueue(minHeap.dequeue())
+}
+if (maxHeap.size - minHeap.size > 1) {
+  minHeap.enqueue(maxHeap.dequeue)
+}
+  }
+
+  // Returns the median of the numbers.
+  def findMedian(): Double = {
+if (isEmpty) {
+  throw new NoSuchElementException("MedianHeap is empty.")
+}
+if (minHeap.size == maxHeap.size) {
+  (minHeap.head + maxHeap.head) / 2.0
--- End diff --

I think average value is the definition of 'median' when there are even 
numbers. Maybe it's better to keep it as `(minHeap.head + maxHeap.head) / 2.0` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-03-05 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r104344072
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.util
+import java.util.NoSuchElementException
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class MedianHeapSuite extends SparkFunSuite {
+
+  test("If no numbers in MedianHeap, NoSuchElementException is thrown.") {
+val medianHeap = new MedianHeap()
+var valid = false
+try {
+  medianHeap.findMedian()
+} catch {
+  case e: NoSuchElementException =>
+valid = true
+}
+
+assert(valid)
+  }
+
+  test("Median should be correct when size of MedianHeap is ord or even") {
+val random = new Random()
+val medianHeap1 = new MedianHeap()
+val array1 = new Array[Int](100)
+(0 until 100).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap1.insert(randomNumber)
+array1(i) += randomNumber
+}
+util.Arrays.sort(array1)
+assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0))
+
+val medianHeap2 = new MedianHeap()
+val array2 = new Array[Int](101)
+(0 until 101).foreach {
+  case i =>
+val randomNumber = random.nextInt(1000)
+medianHeap2.insert(randomNumber)
+array2(i) += randomNumber
+}
+util.Arrays.sort(array2)
+assert(medianHeap2.findMedian() === array2(50))
+  }
+
+  test("Size of Median should be correct though there are duplicated 
numbers inside.") {
--- End diff --

random.nextInt(100) returns value between 0(inclusive) and the specified 
value(exclusive). If I call it 1000 times, there must be duplicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@squito 
Yes, some of machine learning jobs which do cartesian product in my cluster 
have over than 100k tasks in the `TaskSetManager`. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-03-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout  @squito 
Thanks a lot for your comments, really helpful :)
I really think median heap is a good idea. `slice` is `O(n)` and is not 
most efficient.
I'm doing implementation for `MedianHeap`, which provides `add`, `remove` 
and `getMedian` methods.
Yes, for both `TreeSet` and `MedianHeap`, when `handleSuccessfulTask`, we 
are doing insertion with `O(log n)`, I did a 
test(https://github.com/apache/spark/pull/17112) to measure the *sum of time* 
cost by `TreeSet.add`. There are 100k tasks in this test. Results are: 372ms, 
362ms, 458ms, 429ms, 363ms.  Thus I think the `O(log n)` time complexity will 
not be a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...

2017-03-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17133#discussion_r104273530
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala ---
@@ -75,6 +75,8 @@ class TaskInfo(
   }
 
   private[spark] def markFinished(state: TaskState, time: Long = 
System.currentTimeMillis) {
--- End diff --

Yes, good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...

2017-03-03 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17133#discussion_r104161512
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala ---
@@ -75,6 +75,8 @@ class TaskInfo(
   }
 
   private[spark] def markFinished(state: TaskState, time: Long = 
System.currentTimeMillis) {
+// finishTime should be set larger than 0, otherwise "finished" below 
will return false.
+assert(time != 0)
--- End diff --

Yes, I want to assert that `time>0`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...

2017-03-02 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17133#discussion_r104066996
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -695,7 +695,8 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
 val info = taskInfos(tid)
 val index = info.index
-info.markFinished(TaskState.FINISHED)
+// Mark task as finished. Note that finishing time here should be 
bigger than 0.
--- End diff --

Thanks for review :) what I want to say is that `finishTime` of `TaskInfo` 
should be set larger than 0, otherwise `TaskInfo.finishTime` will return false. 
I removed the comments and added an `assert` inside `TaskInfo`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17133: [SPARK-19793] Use clock.getTimeMillis when mark task as ...

2017-03-02 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17133
  
I found this when do https://github.com/apache/spark/pull/17112, which is 
for measuring the approach I proposed in 
https://github.com/apache/spark/pull/16867.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17133: [SPARK-19793] Use clock.getTimeMillis when mark t...

2017-03-02 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17133

[SPARK-19793] Use clock.getTimeMillis when mark task as finished in 
TaskSetManager.

## What changes were proposed in this pull request?

TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as 
finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer 
cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, 
task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set 
by `clock`), the result is not correct.

## How was this patch tested?
Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19793

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17133


commit 56cd922e562ac1aa7a97df01cc16f38a8b3bf250
Author: jinxing 
Date:   2017-03-02T09:33:53Z

[SPARK-19793] Use clock.getTimeMillis when mark task as finished in 
TaskSetManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17111: [SPARK-19777] Scan runningTasksSet when check speculatab...

2017-03-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17111
  
@kayousterhout 
Thanks for merging. 
(btw, I made some measurements for 
https://github.com/apache/spark/pull/16867 SPARK-16929, please take a look when 
you have time :) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-02-28 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
I added a measurement for this pr in #17112 . Results are as below, 
newAlgorithm indicates whether we use `TreeSet` to get the median duration or 
not. And `time cost` is the time used when get the median duration of the 
successful tasks. I did the measurement several times. `tasksNum` indicates how 
many tasks in the `TaskSetManager`.
If `tasksNum=1000`:

| newAlgorithm | time cost |
| --| -- |
| false | 5ms, 3ms, 4ms, 3ms, 3ms |
| true | 2ms, 4ms, 2ms, 2ms, 3ms |

if `tasksNum=10`:

| newAlgorithm | time cost |
| --| -- |
| false | 107ms, 109ms, 103ms, 100ms, 107ms |
| true | 17ms, 14ms, 14ms, 13ms, 14ms |

if `tasksNum=15`:

| newAlgorithm | time cost |
| --| -- |
| false | 133ms, 146ms, 127ms, 163ms, 114ms |
| true | 14ms, 13ms, 15ms, 16ms, 14ms |

As we can see, new algorithm(TreeSet) has better performance than old 
algorithm(Arrays.sort). When tasksNum=10, Arrays.sort costs over 100ms 
every time, while in new algorithm all below 20ms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17112: Measurement for SPARK-16929.

2017-02-28 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17112
  
The unit test "Measurement for SPARK-16929." added is the measurement.
In TaskSetManagerSuite.scala line 1049, if `newAlgorithm=true`, 
`successfulTaskIdsSet `will be used to get the median duration. If 
`newAlgorithm=false`, old algorithm(`Arrays.sort`) will be used.

I calculate the time used for getting median duration in 
TaskSetManager.scala line 957.
If `tasksNum=1000`(TaskSetManagerSuite.scala line 1043), measurement is as 
below:

| newAlgorithm | time cost |
| --| -- |
| false | 5ms, 3ms, 4ms, 3ms, 3ms |
| true | 2ms, 4ms, 2ms, 2ms, 3ms |

| newAlgorithm | time cost |
| --| -- |
| false | 107ms, 109ms, 103ms, 100ms, 107ms |
| true | 17ms, 14ms, 14ms, 13ms, 14ms |

if `tasksNum=15`:

| newAlgorithm | time cost |
| --| -- |
| false | 133ms, 146ms, 127ms, 163ms, 114ms |
| true | 14ms, 13ms, 15ms, 16ms, 14ms |

As we can see, new algorithm(`TreeSet`) has better performance than old 
algorithm(`Arrays.sort`). When `tasksNum`=10, `Arrays.sort` costs over 
100ms every time, while in new algorithm all below 20ms. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17112: Measurement for SPARK-16929.

2017-02-28 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17112

Measurement for SPARK-16929.

## What changes were proposed in this pull request?

This pr doesn't target for merging. It's a measurement for 
https://github.com/apache/spark/pull/16867, in which store successful taskIds 
in `successfulTaskIdsSet` in `TreeSet`, thus the time complexity is O(n/2) when 
get median duration in `checkSpeculatableTasks`.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-16929-measurement

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17112


commit 6825bd741933df9bc25b1dd87080b3edfd2a3e30
Author: jinxing 
Date:   2017-03-01T02:50:19Z

Measurement for SPARK-16929.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17111: [SPARK-19777] Scan runningTasksSet when check speculatab...

2017-02-28 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17111
  
@squito
Thanks a lot :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-02-28 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout @squito 
It's great to open a new jira for this change. Please take a look at 
https://github.com/apache/spark/pull/17111.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17111: [SPARK-19777] Scan runningTasksSet when check speculatab...

2017-02-28 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/17111
  
cc @kayousterhout @squito 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17111: [SPARK-19777] Scan runningTasksSet when check spe...

2017-02-28 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/17111

[SPARK-19777] Scan runningTasksSet when check speculatable tasks in T…

…askSetManager.

## What changes were proposed in this pull request?

When check speculatable tasks in `TaskSetManager`, only scan 
`runningTasksSet` instead of scanning all `taskInfos`.

## How was this patch tested?
Existing tests.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19777

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17111


commit 819a5e8f5f44bb41e3e401b3b68689509184e580
Author: jinxing 
Date:   2017-02-28T07:40:57Z

[SPARK-19777] Scan runningTasksSet when check speculatable tasks in 
TaskSetManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-27 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16867#discussion_r103391138
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -911,14 +916,14 @@ private[spark] class TaskSetManager(
 logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 
0) {
   val time = clock.getTimeMillis()
-  val durations = 
taskInfos.values.filter(_.successful).map(_.duration).toArray
-  Arrays.sort(durations)
+  val durations = successfulTasksSet.toArray.map(taskInfos(_).duration)
   val medianDuration = durations(min((0.5 * 
tasksSuccessful).round.toInt, durations.length - 1))
   val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
--- End diff --

@kayousterhout 
Thanks a lot for your comments :)
I will keep this simple change in this pr. For time complexity improvement, 
I will make another pr and try add some measurements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [WIP][SPARK-19659] Fetch big blocks to disk when shuffle...

2017-02-27 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
@squito 
I've uploaded a design doc to jira, please take a look when you have time :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [WIP][SPARK-16929] Improve performance when check specul...

2017-02-27 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@squito
Thanks a lot for your comments : )
>When check speculatable tasks in TaskSetManager, current code scan all 
task infos and sort durations of successful tasks in O(NlogN) time complexity.

`checkSpeculatableTasks` is scheduled every 100ms by 
`scheduleAtFixedRate`(not `scheduleWithFixedDelay `), thus the interval can be 
less than 100ms. In my cluster(yarn-cluster mode), if size of the task set is 
over 30 and the driver is running on some machine with poor cpu 
performance, the `Arrays.sort` can take over than 100ms easily. Since 
`checkSpeculatableTasks` will synchronize `TaskSchedulerImpl`, I suspect that's 
why my driver hang.

I get median duration by `TreeSet.slice`, which comes from `IterableLike` 
and cannot jump to the mid position unluckily. The time complexity is O(n) in 
this pr.
I can get the mid position by reflection, but I don't want to do that, I 
think that is harmful for code clarity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-02-21 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
@squito 
Thanks a lot for your comments : )
Yes, There must be a design doc for discussing. I will prepare and  post a 
pdf to jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-20 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/16901


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16901
  
@kayousterhout 
I'll close since this functionality is already tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-02-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16867
  
@kayousterhout @squito 
Would you mind to take a look at this when have time ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16790: [SPARK-19450] Replace askWithRetry with askSync.

2017-02-19 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16790
  
@srowen @vanzin 
Thanks a lot for the work on this ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16989: [SPARK-19659] Fetch big blocks to disk when shuffle-read...

2017-02-19 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16989
  
@vanzin @squito 
Would you mind to take a look at this when have time ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16989: [SPARK-19659] Fetch big blocks to disk when shuff...

2017-02-19 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16989

[SPARK-19659] Fetch big blocks to disk when shuffle-read.

## What changes were proposed in this pull request?

Currently the whole block is fetched into memory(off heap by default) when 
shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
be large when skew situations. If OOM happens during shuffle read, job will be 
killed and users will be notified to "Consider boosting 
spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
memory can resolve the OOM. However the approach is not perfectly suitable for 
production environment, especially for data warehouse.
Using Spark SQL as data engine in warehouse, users hope to have a unified 
parameter(e.g. memory) but less resource wasted(resource is allocated but not 
used),
It's not always easy to predict skew situations, when happen, it make sense 
to fetch remote blocks to disk for shuffle-read, rather than
kill the job because of OOM. 

Fetching big blocks to disk is mentioned in SPARK-3019. In this pr, when 
sendRequest in `ShuffleBlockFetcherIterator`, check if `bytesInFlight` is over 
`maxBytesInFlight`, if so, fetch remote blocks to disk by sending 
`StreamRequest`. By setting `spark.reducer.maxSizeInFlight` properly, user can 
avoid OOM without causing performance degradation.

## How was this patch tested?
Not added yet.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19659

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16989.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16989


commit 21f6da36b127956bf35da088f1ecfeb55b307f3e
Author: jinxing 
Date:   2017-02-18T15:00:50Z

[SPARK-19659] Fetch big blocks to disk when shuffle-read.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16790: [SPARK-19450] Replace askWithRetry with askSync.

2017-02-17 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16790
  
Both `askSync` and `askWithRetry` are blocking, the only difference is the 
"retry"(default is 3 times) when the rpc is failed. Callers of this method do 
not necessarily rely on this "retry".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16790: [SPARK-19450] Replace askWithRetry with askSync.

2017-02-16 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16790
  
https://github.com/apache/spark/pull/16690#discussion_r101616883 causes the 
build to produce lots of deprecation warnings. 
@srowen @vanzin  How do you think about this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...

2017-02-16 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16690
  
@srowen 
How do you think about https://github.com/apache/spark/pull/16790?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-15 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout @squito @markhamstra 
Thanks for all of your work for this patch. Really appreciate your help : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-15 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
Yes, refined : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-14 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
Thanks a lot. I've refined the comment, please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16901
  
@squito
Thanks a lot for your comments. I've refined the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100968529
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and success on hostB 
for task(partitionId=1)
+complete(taskSets(1), Seq(
+  (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, 
partitionId=0"), null),
+  (Success, makeMapStatus("hostB", 2
+
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1), there's 
no pending
--- End diff --

Yes, the success should be moved. Sorry for this and I'll rectify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16901
  
@kayousterhout 
I've refined accordingly. Sorry for the stupid mistake I made. Please take 
another look at this : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-13 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout 
I've refined accordingly, please take another look : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-02-13 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r100953546
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("[SPARK-19263] DAGScheduler should not submit multiple active 
tasksets," +
+" even with late completions from earlier stage attempts") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+  null))
+
+scheduler.resubmitFailedStages()
+
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// Task succeeds on a failed executor. The success is bogus.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
+
+assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// There should be no new attempt of stage submitted.
+assert(taskSets.size === 4)
--- End diff --

Yes, I think so : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-12 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout 
Thanks a lot for the clear explanation. It makes great sense to me and help 
me understand the logic a lot. Also I think the way of testing is very good and 
make the code very clear. I've already refined this pr, please take a look when 
tests pass.
Also with understanding of your explanation above in 
>Scenario A (performance optimization, as discussed here already): This 
happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, 
it encountered a fetch failure, so the previous stage needed to be re-run to 
generate the missing output). ... 

I made #16901  to add a test that success of old attempt should be taken as 
valid and corresponding pending partition should be removed. Please give a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-12 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16901
  
@kayousterhout @squito @markhamstra
As mentioned in #16620 , I think it might make sense to make this pr. 
Please take a look. If you think it is too trivial, I will close.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-12 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16901

[SPARK-19565] Improve DAGScheduler tests.

## What changes were proposed in this pull request?

This is related to #16620. 
When fetch failed, stage will be resubmitted. There can be running tasks 
from both old and new stage attempts. This pr added a test to check the case 
that success of tasks from old stage attempt should be taken as valid and 
partitionId should be removed from stage's pendingPartitions accordingly. When 
pending partitions is empty, downstream stage can be scheduled, even though 
there's still running tasks in the active(new) stage attempt.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16901.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16901


commit f5df21405f1eb31c7f32ffd7b32ed02abbc6d033
Author: jinxing 
Date:   2017-02-12T12:39:53Z

[SPARK-19565] Improve DAGScheduler tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-10 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16876
  
@kayousterhout 
It's great to give a definition of `pendingPartitions` in `ShuffleMapStage`.
May I ask a question and make my understanding about `pendingPartitions` 
clear ? It means:
1. PartitionId gets removed from `pendingPartitions` because of real 
success(not including ones from failed executors);
2. `pendingPartitions` is the exact mirror(in reverse) of `ShuffleMapStage` 
's output locations. Thus when `pendingPartition` is empty, all output 
locations are available;
3. When executor lost, we should check if there is need to add partitionId 
back to `pendingPartitions` accordingly.
4. No need to consider resubmit stage when `pendingPartitions` is empty.

Is my understanding correct ? It'd great if you can help rectify my wrong 
points. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

2017-02-09 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16831
  
@kayousterhout Thanks a lot. Sorry for this and I'll be careful in the 
future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-09 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16876
  
It's great to have pendingPartitions in ShuffleMapStage. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...

2017-02-08 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16867

[SPARK-16929] Improve performance when check speculatable tasks.

## What changes were proposed in this pull request?

When check speculatable tasks in `TaskSetManager`, current code scan all 
task infos and sort durations of successful tasks in O(NlogN) time complexity. 
Since during the checkin
g process, `TaskSchedulerImpl`'s synchronized lock is acquired, so it might 
cause performance degradation when check a large scale task set, say hundreds 
of thousands.

This change uses a `TreeSet` to cache the successful task infos and compare 
the median duration with running tasks, avoiding scanning all task infos.
## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-16929

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16867.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16867


commit 1169d118662a9bfdabe88238352fe834a28aee14
Author: jinxing 
Date:   2017-02-07T02:35:10Z

[SPARK-16929] Improve performance when check speculatable tasks.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

2017-02-08 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16831
  
@squito 
Many thanks for your help.  You are so kind person : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

2017-02-08 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16831
  
@kayousterhout  thanks a lot.
I'm not sure how to start the unit test automatically, do I have the right 
to do that?
BTW, may I ask a question, what is the proper way to run the unit test in 
my local?
Currently, I run a single unit test by `build/mvn test 
-DwildcardSuites=org.apache.spark.**Suite` and run all unit tests by ` 
build/mvn -Pyarn -Phadoop-2.6 -Phive-thriftserver -Dhadoop.version=2.6.5 test`. 
Is it correct?
It'd great if you can help me out or give a wiki that I can refer :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@markhamstra @squito @kayousterhout 
It would be great if you can give more comments about above and I can 
continue working on this : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
As @squito mentioned:
>Before this, the DAGScheduler didn't really know anything about 
taskSetManagers. (In its current form, this pr uses a "leaked" handle via 
rootPool.getSortedTaskSetQueue). Is adding it here a mistake? An alternative 
would be to add a method to TaskScheduler like 
markTaskSetsForStageAsZombie(stageId: Int). But that is still basically 
exposing the idea of "zombie" tasksets to the dagscheduler, I dunno if its 
actually any cleaner.

I think this  a cleaner and simpler way for fixing this bug. And we can 
avoid adding TSM info to the DAGScheduler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout @squito @markhamstra 
Thanks a lot for reviewing this pr thus far. I do think the approach, which 
throws away task results from earlier attempts that were running on executors 
that failed and take `stage.pendingPartitions` as an exact mirror(in reverse) 
of the output locations for the state, can really fix this bug and make the 
code quite clear. 
But the understanding I have previously about `stage.pendingPartitions` is 
a little bit different, as commented in `Stage` as below:
```
  /**
   * Partitions the [[DAGScheduler]] is waiting on before it tries to mark 
the stage / job as
   * completed and continue. Tasks' successes in both the active taskset or 
earlier attempts
   * for this stage can cause partition ids get removed from 
pendingPartitions. Finally, note
   * that when this is empty, it does not necessarily mean that stage is 
completed -- Some of
   * the map output from that stage may have been lost. But the 
[[DAGScheduler]] will check for
   * this condition and resubmit the stage if necessary.
   */
```
All tasks' success can result in partition get removed `pendingPartitions`, 
no matter it is from a valid  executor or a failed one. Thus when the 
`pendingPartitions` becomes empty, we can check if the stage's output locations 
are all available, if not we resubmit. 

If we take `stage.pendingPartitions` as an exact mirror(in reverse) of the 
output locations. Some unit tests can not pass in DAGSchedulerSuite(e.g. `("run 
trivial shuffle with out-of-band failure and retry"`). Think about below:
1. A stage have ShuffleMapTask1 and ShuffleMapTask2, 
`pendingPartitions`=(0, 1)
2. ShuffleMapTask1 succeeded on executorA and returned to driver, 
pendingPartitions=(1)
3. ShuffleMapTask2 succeeded on executorA;
4. Driver heard executorA is lost;
5. ShuffleMapTask2's success returned to driver, still 
`pendingPartitions`=(1) and the stage cannot get rescheduled.

In my understanding, `pendingPartitions` helps us to track running of 
`TaskSetManager` and know if there is still tasks coming on the way and deserve 
waiting, and decide when to check if the output locations are all available and 
whether to resubmit.





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16831
  
@kayousterhout 
Thanks a lot for review. I've already refined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16831
  
@squito
Thanks a lot for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16831: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16831
  
@kayousterhout @squito 
This is originally raised by @squito when review 
https://github.com/apache/spark/pull/16620. Sorry for my eager to make this 
small pr. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16831: [SPARK-19263] Fix race in SchedulerIntegrationSui...

2017-02-07 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16831

[SPARK-19263] Fix race in SchedulerIntegrationSuite.

## What changes were proposed in this pull request?

All the process of offering resource and generating `TaskDescription` 
should be guarded by taskScheduler.synchronized in `reviveOffers`, otherwise 
there is race condition.

## How was this patch tested?

Existing unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19263-FixRaceInTest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16831.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16831


commit 05ce9446d386b2ad7827c2ca9f1cb38bf343e9a7
Author: jinxing 
Date:   2017-02-07T11:25:31Z

[SPARK-19263] fix race in SchedulerIntegrationSuite.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout @squito @markhamstra 

Thanks a lot for for the comments. I've already refined accordingly.
I still have one concern:
> If this is a correct description, I’d argue that (5) is the problem: 
that when ShuffleMapTask2 finishes, we should not be updating a bunch of state 
in the DAGScheduler saying that there’s output ready as a result. If I’m 
understanding correctly, there’s a relatively simple fix to this problem: In 
DAGScheduler.scala, in handleTaskCompletion, we should exit (and not update any 
state) when the task is from an earlier stage attempt that’s not the current 
active attempt. This can be done by changing the if-statement on line 1141 to 
include:
|| stageIdToStage(task.stageId).latestInfo.attemptId != task.stageAttemptId

With above, are we ignoring all the results from old stage attempts?
As @squito mentioned:
> It also can potentially improve performance, since you may submit 
downstream stages more quickly, rather than waiting for all tasks in the active 
taskset to complete.

Is it maybe beneficial to add up the result from old stage attempts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...

2017-02-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
@kayousterhout 
Thanks a lot again : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...

2017-02-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
@kayousterhout 
Thanks a lot for helping this pr thus far. I think the proposal is quite 
clear. I've already refined. Please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
Thanks a lot for helping this PR thus far.
I've added unit test in `DAGSchedulerSuite`, but not sure if it is exactly 
what you suggest. 
I created a `mockTaskSchedulerImpl`. Since lots of status are maintained in 
`TaskScheudlerImpl`, I have to trigger the event by `resourceOffers`, 
`handleSuccessfulTask`, `handleFailedTask`.
Please give another look at this when you have time. Really appreciate if 
you could help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...

2017-02-05 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
Thanks a lot for the comments. Actually Im still not sure how to change 
this log or even just remove it. I just think the log is confusing. It is 
printed out every FetchFailed. Please give some suggestions if possible. If 
this is too trival to fix, should I close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16808: [SPARK-19461] Remove some unused imports.

2017-02-05 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/16808


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16808: [SPARK-19461] Remove some unused imports.

2017-02-05 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16808

[SPARK-19461] Remove some unused imports.

## What changes were proposed in this pull request?

Remove some unused imports in `CoarseGrainedSchedulerBackend` and 
`YarnSchedulerBackend`.

## How was this patch tested?

No need to test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19461

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16808


commit 0db96a04b594f816edcddd2bd6e7e9a6e9d6c3a7
Author: jinxing 
Date:   2017-02-05T09:53:18Z

[SPARK-19461] Remove some unused imports.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] Change one misleading log in TaskSetManage...

2017-02-04 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
I just changed the log message, but not sure if it clear enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16807: [SPARK-19398] Change one misleading log in TaskSe...

2017-02-04 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/16807


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16738: [SPARK-19398] Change one misleading log in TaskSe...

2017-02-04 Thread jinxing64
GitHub user jinxing64 reopened a pull request:

https://github.com/apache/spark/pull/16738

[SPARK-19398] Change one misleading log in TaskSetManager.

## What changes were proposed in this pull request?

Log below is misleading:

```
if (successful(index)) {
  logInfo(
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
"but another instance of the task has already succeeded, " +
"so not re-queuing the task to be re-executed.")
}
```

If fetch failed, the task is marked as successful in `TaskSetManager:: 
handleFailedTask`. Then log above will be printed. The `successful` just means 
task will not be scheduled any longer, not a real success.

## How was this patch tested?
Existing unit tests can cover this.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16738.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16738


commit 6ccd9f5cb6c03bad6a7a01104a95c277218ffa2d
Author: jinxing 
Date:   2017-02-05T03:46:35Z

[SPARK-19398] Change one misleading log in TaskSetManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16807: [SPARK-19398] Change one misleading log in TaskSe...

2017-02-04 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16807

[SPARK-19398] Change one misleading log in TaskSetManager.

## What changes were proposed in this pull request?

Log below is misleading:

```
if (successful(index)) {
  logInfo(
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
"but another instance of the task has already succeeded, " +
"so not re-queuing the task to be re-executed.")
}
```

If fetch failed, the task is marked as successful in `TaskSetManager:: 
handleFailedTask`. Then log above will be printed. The `successful` just means 
task will not be scheduled any longer, not a real success.

## How was this patch tested?
Existing unit tests can cover this.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16807


commit 6ccd9f5cb6c03bad6a7a01104a95c277218ffa2d
Author: jinxing 
Date:   2017-02-05T03:46:35Z

[SPARK-19398] Change one misleading log in TaskSetManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16738: [SPARK-19398] Change one misleading log in TaskSe...

2017-02-04 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/16738


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16790: [SPARK-19450] Replace askWithRetry with askSync.

2017-02-03 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16790

[SPARK-19450] Replace askWithRetry with askSync.

## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and 
https://github.com/apache/spark/pull/16690#issuecomment-276850068) and 
`askWithRetry` is marked as deprecated. 
As mentioned 
SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218):

>askWithRetry is basically an unneeded API, and a leftover from the akka 
days that doesn't make sense anymore. It's prone to cause deadlocks (exactly 
because it's blocking), it imposes restrictions on the caller (e.g. 
idempotency) and other things that people generally don't pay that much 
attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It 
might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19450

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16790.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16790


commit b575c55ed9901b4dcf5b3e977a55f135b02aa4b1
Author: jinxing 
Date:   2017-02-03T17:29:05Z

[SPARK-19450] Replace askWithRetry with askSync.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...

2017-02-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
@kayousterhout 
Would you please give a look at this ? It's great if you could help review 
this : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
Would you please take another look at this? Please give some advice if 
possible and I can continue working on this : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16779: [SPARK-19437] Rectify spark executor id in HeartbeatRece...

2017-02-03 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16779
  
Thanks a lot for reviewing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16779: [SPARK-19437] Rectify spark executor id in HeartbeatRece...

2017-02-02 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16779
  
@zsxwing 
Thanks a lot for reviewing this. Not sure why the test doesn't start 
automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16780: [SPARK-19438] Both reading and updating executorD...

2017-02-02 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/16780


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16780: [SPARK-19438] Both reading and updating executorDataMap ...

2017-02-02 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16780
  
Thanks a lot for looking into this~ @zsxwing 
You are right. My understanding about this is incorrect. 
`CoarseGrainedSchedulerBackend: DriverEndpoint` is a `ThreadSafeRpcEndpoint`, 
thus concurrent message processing is disabled.
I'll close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...

2017-02-02 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
@srowen 
Thanks a lot. I'll refine : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16780: [SPARK-19438] Both reading and updating executorD...

2017-02-02 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16780

[SPARK-19438] Both reading and updating executorDataMap should be guarded 
by CoarseGrainedSchedulerBackend.this.synchronized when handle RegisterExecutor.

## What changes were proposed in this pull request?

Currently when handle `RegisterExecutor` in 
`CoarseGrainedSchedulerBackend`, `executorDataMap` is guarded by 
`CoarseGrainedSchedulerBackend.this.synchronized` when updating, which can 
cause `numPendingExecutors` incorrect. 
Code is like below:
```
if (executorDataMap.contains(executorId)) {
  executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " 
+ executorId))
  context.reply(true)
} else {
  ...
  CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
  currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
  numPendingExecutors -= 1
  logDebug(s"Decremented number of pending executors 
($numPendingExecutors left)")
}
  }
```
Consider SPARK-19437 and a scenario like below:
An executor sent `RegisterExecutor` twice by `askWithRetry`, and the 
interval between the two is quite small. Thus it might be possible that both of 
them will go to `else` branch, thus `numPendingExecutors` will be deducted 
twice and become incorrect. Currently, the `askWithRetry` of `RegisterExecutor` 
only exists in some unit tests, but it makes sense to make it stronger when 
handling `RegisterExecutor`.

**TO FIX**
Use CoarseGrainedSchedulerBackend.this.synchronized to guard 
executorDataMap when both reading and updating.

## How was this patch tested?
The code logic is not changed, just manually tested. SPARK-19437 can also 
help verify this fix.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19438

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16780.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16780


commit 7bb3a438eff8c8ab9567a50ca59da2e179939e76
Author: jinxing 
Date:   2017-02-02T15:54:08Z

[SPARK-19438] Both reading and updating executorDataMap should be guarded 
by CoarseGrainedSchedulerBackend.this.synchronized when handle RegisterExecutor.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16779: [SPARK-19437] Rectify spark executor id in Heartb...

2017-02-02 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16779

[SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite.

## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement 
(SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
  RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 
0, Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the 
executorId will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching 
exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to 
https://github.com/apache/spark/pull/16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch 
involves UI changes, please attach a screenshot; otherwise, remove this)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19437

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16779


commit c8403eafbf11837e315a5e5732d7e2e4ccdc673d
Author: jinxing 
Date:   2017-02-02T14:14:35Z

[SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...

2017-02-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16690
  
Thanks a lot for reviewing this PR~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...

2017-02-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
@srowen @jasonmoore2k 
Thanks a lot for reviewing this PR~

 >Should successful and tasksSuccessful renamed to be completed and 
tasksCompleted?

How do you think about above and remove the log.
```
if (successful(index)) {
  logInfo(
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
"but another instance of the task has already succeeded, " +
"so not re-queuing the task to be re-executed.")
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...

2017-02-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16690
  
@vanzin 
Thanks a lot for helping this PR~ I've already refined~
Please take another look~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
Thanks a lot for keep reviewing this~ Your comments are very helpful ~ 
Thank you so much for your help ~~

-when we encounter the condition where there are no pending partitions, but 
there is an active taskset -- we just mark that taskset as inactive

It's good idea, which makes the code quite clear. I've already modified, 
please take another look.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-01-31 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r98819685
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1212,8 +1223,9 @@ class DAGScheduler(
 
   clearCacheLocs()
 
-  if (!shuffleStage.isAvailable) {
-// Some tasks had failed; let's resubmit this shuffleStage
+  if (!shuffleStage.isAvailable && noActiveTaskSetManager) {
--- End diff --

Hrmm... yes, @squito , we shouldn't go to else branch when the shuffleStage 
is not available but active `TaskSetManager` exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...

2017-01-30 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16690
  
I feel very sorry if this is disturbing : )
@vanzin Thanks a lot for continuing reviewing this and I'll be more patient 
: )
Sorry again~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-01-30 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r98488916
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -718,6 +703,21 @@ private[spark] class TaskSetManager(
 " because task " + index + " has already completed successfully")
 }
 maybeFinishTaskSet()
+// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" 
which holds the
+// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 
issue, we should not
+// "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
+// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
+// Note: "result.value()" only deserializes the value when it's called 
at the first time, so
+// here "result.value()" just returns the value and won't block other 
threads.
+sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), 
result.accumUpdates, info)
+// Kill any other attempts for the same task (since those are 
unnecessary now that one
+// attempt completed successfully).
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
+  sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, 
true)
+}
--- End diff --

@squito 
Yes, it make sense to move this part before `maybeFinishTaskSet()`, I will 
refine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-01-30 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
ping for review~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...

2017-01-30 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16690
  
@vanzin 
ping for review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16738: [SPARK-19398] remove one misleading log in TaskSetManage...

2017-01-29 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16738
  
Should `successful` and `tasksSuccessful`  renamed to be `completed` and 
`tasksCompleted`?which I think make more sense. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16738: [SPARK-19398] remove one misleading log in TaskSe...

2017-01-29 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16738

[SPARK-19398] remove one misleading log in TaskSetManager.

## What changes were proposed in this pull request?

Log below is misleading:

```
if (successful(index)) {
  logInfo(
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
"but another instance of the task has already succeeded, " +
"so not re-queuing the task to be re-executed.")
}
```

If fetch failed, the task is marked as successful in TaskSetManager:: 
handleFailedTask. Then log above will be printed. The successful just means 
task will not be scheduled any longer, not a real success.

## How was this patch tested?
Existing unit tests can cover this.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16738.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16738


commit 992bd3ce21d8c7f606307e4d99d236b7de2d82fe
Author: jinxing 
Date:   2017-01-30T05:02:18Z

[SPARK-19398] remove one misleading log in TaskSetManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16690: [SPARK-19347] ReceiverSupervisorImpl can add block to Re...

2017-01-26 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16690
  
@vanzin @zsxwing 
ping for review~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-01-26 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
Could you please take another look at this ? : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-01-26 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r98043010
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -718,6 +703,21 @@ private[spark] class TaskSetManager(
 " because task " + index + " has already completed successfully")
 }
 maybeFinishTaskSet()
+// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" 
which holds the
+// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 
issue, we should not
+// "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
+// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
+// Note: "result.value()" only deserializes the value when it's called 
at the first time, so
+// here "result.value()" just returns the value and won't block other 
threads.
+sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), 
result.accumUpdates, info)
+// Kill any other attempts for the same task (since those are 
unnecessary now that one
+// attempt completed successfully).
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
+  sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, 
true)
+}
--- End diff --

The `Success` is handled in `DAGScheduler` in a different thread. 
`DAGScheduler` perhaps needs to check `tasksetManager's` status, e.g. 
`isZombie`. Move the code here, thus the checking is safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-01-25 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
Fail to pass unit test. I will keep working on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-01-25 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
>hmm, this is a nuisance. I don't see any good way to get rid of this sleep 
... but now that I think about it, why can't you do this in DAGSchedulerSuite? 
it seems like this can be entirely contained to the DAGScheduler and doesn't 
require tricky interactions with other parts of the scheduler. (I'm sorry I 
pointed you in the wrong direction earlier -- I thought perhaps you had tried 
to copy the examples of DAGSchedlerSuite but there was some reason you 
couldn't.)

`DAGSchedulerSuite` is quite hard for me. Because this bug happens during 
the interreaction between `DAGScheduler` and `TaksSchedulerImpl`, actually the 
conflicting exception is thrown in `TaskSchedulerImpl` when `submitTasks` is 
called from `DAGScheduler`. `DAGSchedulerSuite` only provides a very simple 
`TaskScheduler`, of course I can check the conflicting in it but I don't think 
it is convincing enough.

I don't like the `Thread.sleep(5000)` either. But I didn't find a better 
way. I'm sorry to add `TestDAGScheduler` in `SchedulerIntegrationSuite` just 
like `TestTaskScheduler` for tracking more state. But perhaps it can also be 
used in the future. If it is not preferred, I'm so sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-01-25 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@squito 
Thanks a lot for your comments, they are very helpful. I've already refined 
the code, please take another look : )

When handle `Success` of `ShuffleMapTask`, what I want to do is to check 
whether there is some tasks running for some other partitions, if so, do not 
resubmit if `pendingPartitions.isEmpty && !stage.isAvailable`. there are two 
benefits for this:
1. Success of the running tasks have chance to update mapstatus to 
`ShuffleMapStage`, and turn it to be available;
2. Avoid submitting conflicting `taskSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16690: [SPARK-19347] ReceiverSupervisorImpl can add bloc...

2017-01-24 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16690

[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker 
multiple times because of askWithRetry.

## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to 
`ReceiverTracker` on driver side. In current code, `askWithRetry` is used. 
However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result 
in messages are processed multiple times.

*To reproduce*:
1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, 
if so sleep long enough(say 200 seconds), thus the first RPC call will be 
timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
val conf = new SparkConf()
  .setAppName("StreamingTest")
  .setMaster(masterUrl)
val ssc = new StreamingContext(conf, Seconds(200))
val stream = ssc.socketTextStream("localhost", 1234)
stream.print()
ssc.start()
ssc.awaitTermination()
  }
```
To fix:
It makes sense to provide a blocking version `ask` in RpcEndpointRef, as 
mentioned in SPARK-18113 
(https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC 
layer will not drop messages. `askWithRetry` is a leftover from akka days. It 
imposes restrictions on the caller(e.g. idempotency) and other things that 
people generally don't pay that much attention to when using it.


## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.
Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19347

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16690


commit c5bcccf227446f5d044f8fb0518caa12cfef7421
Author: jinxing 
Date:   2017-01-24T09:33:23Z

[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker 
multiple times because of askWithRetry




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-01-20 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/16620
  
@markhamstra 
Thanks a lot for your comment, I've already refined, please take another 
look ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    2   3   4   5   6   7   8   >