[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD

2017-04-08 Thread reggert
Github user reggert commented on the issue:

https://github.com/apache/spark/pull/15899
  
Strictly speaking, this doesn't just affect pair RDDs. It affects any RDDs 
on which a `for` expression involving a filter operation, which includes 
explicit `if` clauses as well as pattern matches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD

2017-03-21 Thread reggert
Github user reggert commented on the issue:

https://github.com/apache/spark/pull/15899
  
The (k,v) <- pairRDD expression involves a pattern match, which the 
compiler converts into a filter/withFilter call on items that match the pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD

2016-11-18 Thread reggert
Github user reggert commented on the issue:

https://github.com/apache/spark/pull/15899
  
I don't get why you say that it "doesn't even work in general". Under what 
circumstances doesn't it work? I've never run into any problems with it. 

The "simple syntactic sugar" allows very clear, concise code to be written 
in many cases, and even lets you take advantage of Scala pattern matching for 
filtering. For example:
```scala
val strings = sparkContext.parallelize(List("1213,999", "abc", "456,789"))
val NumberPairString = """(\d{1,5}),(\d{1,5})""".r
val numbers = for (NumberPairString(a, b) <- strings; n <- Seq(a, b)) yield 
n.toInt
// numbers.collect() yields Array[Int](1213, 999, 456, 789)
```

Without the `for` comprehension, you wind up with this significantly uglier 
and somewhat confusing chain of calls:

```scala
val strings = sparkContext.parallelize(List("1213,999", "abc", "456,789"))
val NumberPairString = """(\d{1,5}),(\d{1,5})""".r
val numbers = strings.filter {
   case NumberPairString(_, _) => true 
   case _ => false
}.flatMap{
   case NumberString(a, b) => Seq(a, b)
}.map(_.toInt)
// numbers.collect() yields Array[Int](1213, 999, 456, 789)
```
There are alternate ways to write this (e.g., with a single `flatMap` on a 
pattern match function that returns either 0 or 2 elements), but none of them 
are as clean and concise as the `for` version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD

2016-11-17 Thread reggert
Github user reggert commented on the issue:

https://github.com/apache/spark/pull/15899
  
I disagree strongly. I've used RDDs in for comprehensions for almost 2
years without issue. Being able to "extend for loops" in this way is a
major language feature of Scala that helps in writing expressive code.
Refusing to support it would be considered surprising and irritating to
most Scala developers.

Rich

On Nov 17, 2016 4:34 PM, "Reynold Xin" <notificati...@github.com> wrote:

> @reggert <https://github.com/reggert> @srowen <https://github.com/srowen>
> any reaction to my pushback?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/15899#issuecomment-261376699>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AB4WqZRI4Sd1rBMPOhtClYzySSKmoWegks5q_MhygaJpZM4Kzavq>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD

2016-11-16 Thread reggert
Github user reggert commented on the issue:

https://github.com/apache/spark/pull/15899
  
The only other weird case I've run into is trying to `flatMap` across 
multiple RDDs, e.g., `for (x <- rdd1; y <- rdd2) yield x + y`, but it simply 
won't compile because `RDD.flatMap` doesn't support it (for good reason). You 
_can_ do it for an RDD and a regular collection, e.g., `for (x <- rdd; y <- 1 
to 10) yield x + y`, which is a perfectly reasonable use case. Other than that, 
the only issues that I'm aware of are ones that would also affect regular 
(desugared) chained method calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15899: [SPARK-18466] added withFilter method to RDD

2016-11-16 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/15899#discussion_r88267555
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -70,6 +70,7 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(!nums.isEmpty())
 assert(nums.max() === 4)
 assert(nums.min() === 1)
+assert((for (n <- nums if n > 2) yield n).collect().toList === List(3, 
4))
--- End diff --

According to IntelliJ, the `for` comprehension desugars into
```scala
nums.withFilter(n => n > 2).map(n => n)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD

2016-11-16 Thread reggert
Github user reggert commented on the issue:

https://github.com/apache/spark/pull/15899
  
Using RDD's in `for` comprehensions dates back _at least_ to the examples 
given in the old 2010 AMPLab paper "Spark: Cluster Computing with Working 
Sets". `for` comprehensions are in no way limited to local collections. They 
provide syntactic sugar that, for many use cases, makes the code easier to 
comprehend than a series of chained method calls.
Other examples of using them for non-collections include the Scala `Future` 
class as well as query construction using the SLICK database access library.

I agree that a similar change should be made for other abstractions built 
on top of RDD's, such as `DataSet`s and `DStream`s, though. I'll add those when 
I get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15899: [SPARK-18466] added withFilter method to RDD

2016-11-15 Thread reggert
GitHub user reggert opened a pull request:

https://github.com/apache/spark/pull/15899

[SPARK-18466] added withFilter method to RDD

## What changes were proposed in this pull request?

A `withFilter` method has been added to `RDD` as an alias for the `filter` 
method. When using `for` comprehensions, the Scala compiler prefers (and as of 
2.12, _requires_) the lazy `withFilter` method, only falling back to using the 
`filter` method (which, for regular collections, is non-lazy, but for RDDs is 
lazy). Prior to Scala 2.12, this fallback causes the compiler to emit a 
warning, and as of Scala 2.12, it results in an error.

## How was this patch tested?

`RDDSuite` was updated by adding a line to "basic operations" that 
duplicates the behavior of the `filter` test, but uses a `for` comprehension 
instead of a direct method call.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/reggert/spark feature/withfilter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15899.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15899


commit db9cf5a9627755e1c8e212b7f29d54dc8bed1c54
Author: Richard W. Eggert II <richard.egg...@gmail.com>
Date:   2016-11-16T05:52:27Z

[SPARK-18466] added withFilter method to RDD




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-15 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-164967910
  
@andrewor14 I included a couple lines to propagate the local properties 
(from the thread that created the `ComplexFutureAction`) to each of the spawned 
jobs (all but the first of which may be launched from other threads, losing the 
local properties unless `setLocalProperties` is called). AFAIK this resolves 
SPARK-4514.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-13 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r47446916
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): 
Unit = {
+val executionContextInvoked = Promise[Unit]
+val fakeExecutionContext = new ExecutionContext {
+  override def execute(runnable: Runnable): Unit = {
+executionContextInvoked.success(())
+  }
+  override def reportFailure(t: Throwable): Unit = ()
+}
+val starter = Smuggle(new Semaphore(0))
+starter.drainPermits()
+val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => 
starter.acquire(1); itr}
+val f = action(rdd)
+f.onComplete(_ => ())(fakeExecutionContext)
+// Here we verify that registering the callback didn't cause a thread 
to be consumed.
+assert(!executionContextInvoked.isCompleted)
+// Now allow the executors to proceed with task processing.
+starter.release(rdd.partitions.length)
+// Waiting for the result verifies that the tasks were successfully 
processed.
+// This mainly exists to verify that we didn't break task 
deserialization.
--- End diff --

Oh, is that all? :-)

Comment line removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-10 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r47322111
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): 
Unit = {
+val executionContextInvoked = Promise[Unit]
+val fakeExecutionContext = new ExecutionContext {
+  override def execute(runnable: Runnable): Unit = {
+executionContextInvoked.success(())
+  }
+  override def reportFailure(t: Throwable): Unit = ()
+}
+val starter = Smuggle(new Semaphore(0))
+starter.drainPermits()
+val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => 
starter.acquire(1); itr}
+val f = action(rdd)
+f.onComplete(_ => ())(fakeExecutionContext)
+// Here we verify that registering the callback didn't cause a thread 
to be consumed.
+assert(!executionContextInvoked.isCompleted)
+// Now allow the executors to proceed with task processing.
+starter.release(rdd.partitions.length)
+// Waiting for the result verifies that the tasks were successfully 
processed.
+// This mainly exists to verify that we didn't break task 
deserialization.
--- End diff --

If `mapPartitions` throws an exception, won't that cause the test to fail 
before we even get to `Await.result`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-09 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r47117656
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): 
Unit = {
+val executionContextInvoked = Promise[Unit]
+val fakeExecutionContext = new ExecutionContext {
+  override def execute(runnable: Runnable): Unit = {
+executionContextInvoked.success(())
+  }
+  override def reportFailure(t: Throwable): Unit = ()
+}
+val starter = Smuggle(new Semaphore(0))
+starter.drainPermits()
+val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => 
starter.acquire(1); itr}
+val f = action(rdd)
+f.onComplete(_ => ())(fakeExecutionContext)
+// Here we verify that registering the callback didn't cause a thread 
to be consumed.
+assert(!executionContextInvoked.isCompleted)
+// Now allow the executors to proceed with task processing.
+starter.release(rdd.partitions.length)
+// Waiting for the result verifies that the tasks were successfully 
processed.
+// This mainly exists to verify that we didn't break task 
deserialization.
--- End diff --

To clarify, in the event that we've broken task deserialization, I would 
expect to see a `NotSerializableException` or similar error thrown from the 
`Await.result` call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-07 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r46909976
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): 
Unit = {
+val executionContextInvoked = Promise[Unit]
+val fakeExecutionContext = new ExecutionContext {
+  override def execute(runnable: Runnable): Unit = {
+executionContextInvoked.success(())
+  }
+  override def reportFailure(t: Throwable): Unit = ()
+}
+val starter = Smuggle(new Semaphore(0))
+starter.drainPermits()
+val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => 
starter.acquire(1); itr}
+val f = action(rdd)
+f.onComplete(_ => ())(fakeExecutionContext)
+// Here we verify that registering the callback didn't cause a thread 
to be consumed.
+assert(!executionContextInvoked.isCompleted)
+// Now allow the executors to proceed with task processing.
+starter.release(rdd.partitions.length)
+// Waiting for the result verifies that the tasks were successfully 
processed.
+// This mainly exists to verify that we didn't break task 
deserialization.
--- End diff --

Unless I'm mistaken, the `Await.result` call will throw an exception if the 
job fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-06 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-162326651
  
All I want for Christmas is a code review. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-12-02 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-161528329
  
@zsxwing @JoshRosen I just want to make sure that you guys haven't 
forgotten about this. I haven't heard anything in a week and a half.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...

2015-11-30 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-160829564
  
Any feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-22 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-158783297
  
SparkQA seems to be happy now.
@JoshRosen What else needs to be done to move forward on this? The current 
implementation on master requires a dedicated thread pool for all callbacks on 
FutureActions to avoid depleting the global ExecutionContext (and this fact is 
entirely undocumented), so it would be good to have these changes in 1.6.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-22 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-158798727
  
It appears that this PR may also fix SPARK-4514.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-22 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-158799901
  
Correction: This PR only fixes SPARK-4514 as long as the 
{{ComplexFutureAction}} only spawns a single job. The job group information is 
lost in subsequent jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-22 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-158800472
  
I've implemented a two-line fix for SPARK-4514.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-21 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-158680579
  
I've come up with a reusable way to make use of semaphores to control 
timing of tasks during unit tests. Please see the `Smuggle` class and let me 
know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-20 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-158609528
  
Can someone re-run the build?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-18 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-157972418
  
Any thoughts on the latest changes (besides the merge)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-16 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-157254101
  
I took a stab at implementing the above refactoring, and it came out 
looking pretty nice, so I went ahead and committed it. Please take a look and 
let me know if you have any problems with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-16 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-157295993
  
I most certainly did not add all those classes (other than `JobSubmitter`)! 
@SparkQA is confused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-15 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-156844499
  
@srowen Okay. I was just worried I was going to get blamed for breaking 
something. ;-)

What's left to do here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-15 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-156868899
  
I'm not particularly happy about the (internal) API exposed by 
`ComplexFutureAction`. Having callers instantiate the action and then mutate it 
by calling the `run` and `submitJob` methods just seems sloppy and error-prone.

I think I would prefer that instead of having a `run` method that takes a 
closure returning `Future[T]`, `ComplexFutureAction` should accept a 
constructor parameter of type `JobSubmitter => Future[T]`, where `JobSubmitter` 
would be a (Spark-private) trait providing the `submitJob` method. This would 
make `ComplexFutureAction` more or less immutable after construction (except 
for cancellation) and prevent someone from calling `run` or `submitJob` from 
outside Spark and making a mess of things. This is a fairly major changes 
introducing a new trait, so I will hold off implementing it until I get some 
positive feedback about it.

Additionally, it seems like certain common aspects of `SimpleFutureAction` 
and `ComplexFutureAction`  (such as the `_cancellation` field and the base 
implementation of `cancel`) could be pulled out into an implementation trait 
(i.e., `FutureActionLike`) to avoid duplicating code.

Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-15 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44879232
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, 
SparkFunSuite}
+import org.apache.spark._
--- End diff --

The line needs to change regardless, because an import was added. 
Explicitly specifying 5 imported classes causes the line to exceed 100 
characters, however.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864546
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -95,19 +102,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) 
extends Serializable with Loggi
 val p = partsScanned until math.min(partsScanned + numPartsToTry, 
totalParts)
 
 val buf = new Array[Array[T]](p.size)
-f.runJob(self,
+val job = f.submitJob(self,
   (it: Iterator[T]) => it.take(left).toArray,
   p,
   (index: Int, data: Array[T]) => buf(index) = data,
   Unit)
-
-buf.foreach(results ++= _.take(num - results.size))
-partsScanned += numPartsToTry
+job.flatMap {case _ =>
--- End diff --

Removed the `case`, but retained the `_`, as I think it makes it clear that 
we're not using the argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864566
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -276,10 +219,11 @@ class ComplexFutureAction[T] extends FutureAction[T] {
 
   override def value: Option[Try[T]] = p.future.value
 
-  def jobIds: Seq[Int] = jobs
+  def jobIds: Seq[Int] = subActions flatMap {_.jobIds}
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864560
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -66,14 +65,22 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) 
extends Serializable with Loggi
*/
   def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
 val f = new ComplexFutureAction[Seq[T]]
-
-f.run {
-  // This is a blocking action so we should use 
"AsyncRDDActions.futureExecutionContext" which
-  // is a cached thread pool.
-  val results = new ArrayBuffer[T](num)
-  val totalParts = self.partitions.length
-  var partsScanned = 0
-  while (results.size < num && partsScanned < totalParts) {
+// Cached thread pool to handle aggregation of subtasks.
+implicit val executionContext = AsyncRDDActions.futureExecutionContext
+val results = new ArrayBuffer[T](num)
+val totalParts = self.partitions.length
+
+/*
+  Recursively triggers jobs to scan partitions until either the 
requested
+  number of elements are retrieved, or the partitions to scan are 
exhausted.
+  This implementation is non-blocking, asynchronously handling the
+  results of each job and triggering the next job using callbacks on 
futures.
+ */
+def continue(partsScanned : Int) : Future[Seq[T]] =
+  if (results.size >= num || partsScanned >= totalParts) {
+Future.successful(results.toSeq)
+  }
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864605
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
  * takeSample. Cancellation works by setting the cancelled flag to true 
and interrupting the
  * action thread if it is being blocked by a job.
  */
+@DeveloperApi
 class ComplexFutureAction[T] extends FutureAction[T] {
 
-  // Pointer to the thread that is executing the action. It is set when 
the action is run.
-  @volatile private var thread: Thread = _
+  @volatile private var _cancelled = false
 
-  // A flag indicating whether the future has been cancelled. This is used 
in case the future
-  // is cancelled before the action was even run (and thus we have no 
thread to interrupt).
-  @volatile private var _cancelled: Boolean = false
-
-  @volatile private var jobs: Seq[Int] = Nil
+  @volatile private var subActions: List[FutureAction[_]] = Nil
 
   // A promise used to signal the future.
-  private val p = promise[T]()
+  private val p = Promise[T]()
 
-  override def cancel(): Unit = this.synchronized {
+  override def cancel(): Unit = synchronized {
 _cancelled = true
-if (thread != null) {
-  thread.interrupt()
-}
+p.tryFailure(new SparkException("Action has been cancelled"))
+subActions.foreach {_.cancel()}
--- End diff --

I personally like the curly braces because it makes it clear that it's a 
code block, but I'll change it anyway, because IntelliJ tends to get a little 
trigger-happy with auto-formatting when it sees them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864614
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
  * takeSample. Cancellation works by setting the cancelled flag to true 
and interrupting the
  * action thread if it is being blocked by a job.
  */
+@DeveloperApi
 class ComplexFutureAction[T] extends FutureAction[T] {
 
-  // Pointer to the thread that is executing the action. It is set when 
the action is run.
-  @volatile private var thread: Thread = _
+  @volatile private var _cancelled = false
 
-  // A flag indicating whether the future has been cancelled. This is used 
in case the future
-  // is cancelled before the action was even run (and thus we have no 
thread to interrupt).
-  @volatile private var _cancelled: Boolean = false
-
-  @volatile private var jobs: Seq[Int] = Nil
+  @volatile private var subActions: List[FutureAction[_]] = Nil
 
   // A promise used to signal the future.
-  private val p = promise[T]()
+  private val p = Promise[T]()
 
-  override def cancel(): Unit = this.synchronized {
+  override def cancel(): Unit = synchronized {
 _cancelled = true
-if (thread != null) {
-  thread.interrupt()
-}
+p.tryFailure(new SparkException("Action has been cancelled"))
+subActions.foreach {_.cancel()}
   }
 
   /**
* Executes some action enclosed in the closure. To properly enable 
cancellation, the closure
* should use runJob implementation in this promise. See takeAsync for 
example.
*/
-  def run(func: => T)(implicit executor: ExecutionContext): this.type = {
-scala.concurrent.future {
-  thread = Thread.currentThread
-  try {
-p.success(func)
-  } catch {
-case e: Exception => p.failure(e)
-  } finally {
-// This lock guarantees when calling `thread.interrupt()` in 
`cancel`,
-// thread won't be set to null.
-ComplexFutureAction.this.synchronized {
-  thread = null
-}
-  }
-}
+  def run(func: => Future[T])(implicit executor: ExecutionContext): 
this.type = {
+p.tryCompleteWith(func)
 this
   }
 
   /**
-   * Runs a Spark job. This is a wrapper around the same functionality 
provided by SparkContext
+   * Submit a job for execution and return a FutureAction holding the 
result.
+   * This is a wrapper around the same functionality provided by 
SparkContext
* to enable cancellation.
*/
-  def runJob[T, U, R](
+  def submitJob[T, U, R](
   rdd: RDD[T],
   processPartition: Iterator[T] => U,
   partitions: Seq[Int],
   resultHandler: (Int, U) => Unit,
-  resultFunc: => R) {
+  resultFunc: => R): FutureAction[R] = synchronized {
 // If the action hasn't been cancelled yet, submit the job. The check 
and the submitJob
 // command need to be in an atomic block.
-val job = this.synchronized {
-  if (!isCancelled) {
-rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
-  } else {
-throw new SparkException("Action has been cancelled")
-  }
-}
-
-this.jobs = jobs ++ job.jobIds
-
-// Wait for the job to complete. If the action is cancelled (with an 
interrupt),
-// cancel the job and stop the execution. This is not in a 
synchronized block because
-// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
-try {
-  Await.ready(job, Duration.Inf)
-} catch {
-  case e: InterruptedException =>
-job.cancel()
-throw new SparkException("Action has been cancelled")
+if (!isCancelled) {
+  val job = rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
+  subActions = job::subActions
--- End diff --

I don't think I've ever seen it done that way, and ScalaStyle doesn't seem 
to care.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864643
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, 
SparkFunSuite}
+import org.apache.spark._
--- End diff --

I think IntelliJ did that automatically because the number of things being 
imported exceeded some threshold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864625
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +119,27 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): 
SimpleFutureAction.this.type = {
-if (!atMost.isFinite()) {
-  awaitResult()
-} else jobWaiter.synchronized {
-  val finishTime = System.currentTimeMillis() + atMost.toMillis
-  while (!isCompleted) {
-val time = System.currentTimeMillis()
-if (time >= finishTime) {
-  throw new TimeoutException
-} else {
-  jobWaiter.wait(finishTime - time)
-}
-  }
-}
+jobWaiter.completionFuture.ready(atMost)
 this
   }
 
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T = {
-ready(atMost)(permit)
-awaitResult() match {
-  case scala.util.Success(res) => res
-  case scala.util.Failure(e) => throw e
-}
+jobWaiter.completionFuture.ready(atMost)
+assert(value.isDefined, "Future has not completed properly")
+value.get.get
   }
 
   override def onComplete[U](func: (Try[T]) => U)(implicit executor: 
ExecutionContext) {
-executor.execute(new Runnable {
-  override def run() {
-func(awaitResult())
-  }
-})
+jobWaiter.completionFuture onComplete {_ => func(value.get)}
   }
 
   override def isCompleted: Boolean = jobWaiter.jobFinished
 
   override def isCancelled: Boolean = _cancelled
 
-  override def value: Option[Try[T]] = {
-if (jobWaiter.jobFinished) {
-  Some(awaitResult())
-} else {
-  None
-}
-  }
-
-  private def awaitResult(): Try[T] = {
-jobWaiter.awaitResult() match {
-  case JobSucceeded => scala.util.Success(resultFunc)
-  case JobFailed(e: Exception) => scala.util.Failure(e)
-}
-  }
+  override def value: Option[Try[T]] =
+jobWaiter.completionFuture.value.map {res => res.map {_ => resultFunc}}
--- End diff --

Does not compile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864655
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +119,27 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): 
SimpleFutureAction.this.type = {
-if (!atMost.isFinite()) {
-  awaitResult()
-} else jobWaiter.synchronized {
-  val finishTime = System.currentTimeMillis() + atMost.toMillis
-  while (!isCompleted) {
-val time = System.currentTimeMillis()
-if (time >= finishTime) {
-  throw new TimeoutException
-} else {
-  jobWaiter.wait(finishTime - time)
-}
-  }
-}
+jobWaiter.completionFuture.ready(atMost)
 this
   }
 
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T = {
-ready(atMost)(permit)
-awaitResult() match {
-  case scala.util.Success(res) => res
-  case scala.util.Failure(e) => throw e
-}
+jobWaiter.completionFuture.ready(atMost)
+assert(value.isDefined, "Future has not completed properly")
+value.get.get
   }
 
   override def onComplete[U](func: (Try[T]) => U)(implicit executor: 
ExecutionContext) {
-executor.execute(new Runnable {
-  override def run() {
-func(awaitResult())
-  }
-})
+jobWaiter.completionFuture onComplete {_ => func(value.get)}
   }
 
   override def isCompleted: Boolean = jobWaiter.jobFinished
 
   override def isCancelled: Boolean = _cancelled
 
-  override def value: Option[Try[T]] = {
-if (jobWaiter.jobFinished) {
-  Some(awaitResult())
-} else {
-  None
-}
-  }
-
-  private def awaitResult(): Try[T] = {
-jobWaiter.awaitResult() match {
-  case JobSucceeded => scala.util.Success(resultFunc)
-  case JobFailed(e: Exception) => scala.util.Failure(e)
-}
-  }
+  override def value: Option[Try[T]] =
+jobWaiter.completionFuture.value.map {res => res.map {_ => resultFunc}}
--- End diff --

I've replaced the curly braces with parentheses, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864690
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,50 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action: RDD[Int] => FutureAction[R])
+(starter: => Semaphore) : Unit = {
--- End diff --

Keeping it this way keeps the call sites a lot less cluttered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864838
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
  * takeSample. Cancellation works by setting the cancelled flag to true 
and interrupting the
  * action thread if it is being blocked by a job.
  */
+@DeveloperApi
 class ComplexFutureAction[T] extends FutureAction[T] {
 
-  // Pointer to the thread that is executing the action. It is set when 
the action is run.
-  @volatile private var thread: Thread = _
+  @volatile private var _cancelled = false
 
-  // A flag indicating whether the future has been cancelled. This is used 
in case the future
-  // is cancelled before the action was even run (and thus we have no 
thread to interrupt).
-  @volatile private var _cancelled: Boolean = false
-
-  @volatile private var jobs: Seq[Int] = Nil
+  @volatile private var subActions: List[FutureAction[_]] = Nil
 
   // A promise used to signal the future.
-  private val p = promise[T]()
+  private val p = Promise[T]()
 
-  override def cancel(): Unit = this.synchronized {
+  override def cancel(): Unit = synchronized {
 _cancelled = true
-if (thread != null) {
-  thread.interrupt()
-}
+p.tryFailure(new SparkException("Action has been cancelled"))
+subActions.foreach {_.cancel()}
   }
 
   /**
* Executes some action enclosed in the closure. To properly enable 
cancellation, the closure
* should use runJob implementation in this promise. See takeAsync for 
example.
*/
-  def run(func: => T)(implicit executor: ExecutionContext): this.type = {
-scala.concurrent.future {
-  thread = Thread.currentThread
-  try {
-p.success(func)
-  } catch {
-case e: Exception => p.failure(e)
-  } finally {
-// This lock guarantees when calling `thread.interrupt()` in 
`cancel`,
-// thread won't be set to null.
-ComplexFutureAction.this.synchronized {
-  thread = null
-}
-  }
-}
+  def run(func: => Future[T])(implicit executor: ExecutionContext): 
this.type = {
+p.tryCompleteWith(func)
 this
   }
 
   /**
-   * Runs a Spark job. This is a wrapper around the same functionality 
provided by SparkContext
+   * Submit a job for execution and return a FutureAction holding the 
result.
+   * This is a wrapper around the same functionality provided by 
SparkContext
* to enable cancellation.
*/
-  def runJob[T, U, R](
+  def submitJob[T, U, R](
   rdd: RDD[T],
   processPartition: Iterator[T] => U,
   partitions: Seq[Int],
   resultHandler: (Int, U) => Unit,
-  resultFunc: => R) {
+  resultFunc: => R): FutureAction[R] = synchronized {
 // If the action hasn't been cancelled yet, submit the job. The check 
and the submitJob
 // command need to be in an atomic block.
-val job = this.synchronized {
-  if (!isCancelled) {
-rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
-  } else {
-throw new SparkException("Action has been cancelled")
-  }
-}
-
-this.jobs = jobs ++ job.jobIds
-
-// Wait for the job to complete. If the action is cancelled (with an 
interrupt),
-// cancel the job and stop the execution. This is not in a 
synchronized block because
-// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
-try {
-  Await.ready(job, Duration.Inf)
-} catch {
-  case e: InterruptedException =>
-job.cancel()
-throw new SparkException("Action has been cancelled")
+if (!isCancelled) {
+  val job = rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
+  subActions = job::subActions
--- End diff --

I actually haven't seen Spark code that uses `::` at all (not that it 
doesn't exist - I just haven't looked at any files that use it).
My understanding is that using `::` as the "cons" operator is a notation 
inherited from ML. All of the (S)ML examples I have ever seen use it without 
any spaces, though it seems that Scala examples don't necessarily follow the 
same style convention. To me, using it without the spaces reads better, since 
it usually looks like a list of elements chained together, but it doesn't 
really matter to me either way.


---
If your project is set up for it, you can reply to this email an

[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44864860
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,50 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action: RDD[Int] => FutureAction[R])
+(starter: => Semaphore) : Unit = {
--- End diff --

To clarify: If I put the parameters in the same parameter list, the 
compiler forces me to be a lot more verbose at the call sites (with curly 
braces, `=>`, and so forth)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44865053
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
  * takeSample. Cancellation works by setting the cancelled flag to true 
and interrupting the
  * action thread if it is being blocked by a job.
  */
+@DeveloperApi
 class ComplexFutureAction[T] extends FutureAction[T] {
 
-  // Pointer to the thread that is executing the action. It is set when 
the action is run.
-  @volatile private var thread: Thread = _
+  @volatile private var _cancelled = false
 
-  // A flag indicating whether the future has been cancelled. This is used 
in case the future
-  // is cancelled before the action was even run (and thus we have no 
thread to interrupt).
-  @volatile private var _cancelled: Boolean = false
-
-  @volatile private var jobs: Seq[Int] = Nil
+  @volatile private var subActions: List[FutureAction[_]] = Nil
 
   // A promise used to signal the future.
-  private val p = promise[T]()
+  private val p = Promise[T]()
 
-  override def cancel(): Unit = this.synchronized {
+  override def cancel(): Unit = synchronized {
 _cancelled = true
-if (thread != null) {
-  thread.interrupt()
-}
+p.tryFailure(new SparkException("Action has been cancelled"))
+subActions.foreach {_.cancel()}
   }
 
   /**
* Executes some action enclosed in the closure. To properly enable 
cancellation, the closure
* should use runJob implementation in this promise. See takeAsync for 
example.
*/
-  def run(func: => T)(implicit executor: ExecutionContext): this.type = {
-scala.concurrent.future {
-  thread = Thread.currentThread
-  try {
-p.success(func)
-  } catch {
-case e: Exception => p.failure(e)
-  } finally {
-// This lock guarantees when calling `thread.interrupt()` in 
`cancel`,
-// thread won't be set to null.
-ComplexFutureAction.this.synchronized {
-  thread = null
-}
-  }
-}
+  def run(func: => Future[T])(implicit executor: ExecutionContext): 
this.type = {
+p.tryCompleteWith(func)
 this
   }
 
   /**
-   * Runs a Spark job. This is a wrapper around the same functionality 
provided by SparkContext
+   * Submit a job for execution and return a FutureAction holding the 
result.
+   * This is a wrapper around the same functionality provided by 
SparkContext
* to enable cancellation.
*/
-  def runJob[T, U, R](
+  def submitJob[T, U, R](
   rdd: RDD[T],
   processPartition: Iterator[T] => U,
   partitions: Seq[Int],
   resultHandler: (Int, U) => Unit,
-  resultFunc: => R) {
+  resultFunc: => R): FutureAction[R] = synchronized {
 // If the action hasn't been cancelled yet, submit the job. The check 
and the submitJob
 // command need to be in an atomic block.
-val job = this.synchronized {
-  if (!isCancelled) {
-rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
-  } else {
-throw new SparkException("Action has been cancelled")
-  }
-}
-
-this.jobs = jobs ++ job.jobIds
-
-// Wait for the job to complete. If the action is cancelled (with an 
interrupt),
-// cancel the job and stop the execution. This is not in a 
synchronized block because
-// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
-try {
-  Await.ready(job, Duration.Inf)
-} catch {
-  case e: InterruptedException =>
-job.cancel()
-throw new SparkException("Action has been cancelled")
+if (!isCancelled) {
+  val job = rdd.context.submitJob(rdd, processPartition, partitions, 
resultHandler, resultFunc)
+  subActions = job::subActions
--- End diff --

I searched for "ml cons operator" in Google, and out all the hits on the 
first page that included (ML, SML, or F#) code samples, all but one omitted the 
spaces. Anyway, I've inserted spaces for the sake of consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscr

[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-156768512
  
It sure would be nice if Spark's unit tests were consistent. I add 
whitespace and suddenly the build fails?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-14 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-156769951
  
Unrelated to this PR (except that it may be partially responsible for the 
most recent test failure), there appears to be a race condition here:
```scala
if (!executionContext.isShutdown) {
  val f = Future { deleteFiles() }
```

(from `FileBasedWriteAheadLog.clean` in spark-streaming)

If the `ExecutionContext` shuts down after `isShutdown` is called but 
before the task underlying the `Future` is enqueued on it, an exception will be 
thrown, which appears to be what's happening in 
`CommonWriteAheadLogTests.logCleanUpTest`. I'm not certain why the 
`ExecutionContext` would be getting shut down, but it does seem like the test 
that's failing has an awfully short timeout, and according to the stack trace, 
the thread pool is very busy at the time of the failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-12 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-156295760
  
Is there anything left to be done before merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44564647
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,31 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  private def testAsyncAction[R](action : RDD[Int] => FutureAction[R]) : 
Unit = {
+val executorInvoked = Promise[Unit]
+val fakeExecutionContext = new ExecutionContext {
+  override def execute(runnable: Runnable): Unit = {
+executorInvoked.success(())
+  }
+  override def reportFailure(t: Throwable): Unit = ()
+}
+/*
+  We sleep here so that we get to the assertion before the job 
completes.
+  I wish there were a cleaner way to do this, but trying to use any 
sort of synchronization
+  with this fails due to task serialization.
+*/
+val rdd = sc.parallelize(1 to 100, 4).mapPartitions(itr => 
{Thread.sleep(1000L); itr})
--- End diff --

I just copied the style of the test appears immediately above this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44564235
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -20,13 +20,16 @@ package org.apache.spark
 import java.util.Collections
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.api.java.JavaFutureAction
-import org.apache.spark.rdd.RDD
-import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
+import org.apache.spark.annotation.DeveloperApi
--- End diff --

I blame IntelliJ. Fixed. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44536897
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -105,6 +108,7 @@ trait FutureAction[T] extends Future[T] {
  * A [[FutureAction]] holding the result of an action that triggers a 
single job. Examples include
  * count, collect, reduce.
  */
+@DeveloperApi
--- End diff --

I agree.  I was afraid someone would yell at me if I started changing 
classes from public to private,  though. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44534066
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -105,6 +108,7 @@ trait FutureAction[T] extends Future[T] {
  * A [[FutureAction]] holding the result of an action that triggers a 
single job. Examples include
  * count, collect, reduce.
  */
+@DeveloperApi
--- End diff --

Why are we exposing these classes as "stable API"? All of the public 
methods of `AsyncRDDFunctions` return `FutureAction`, not `SimpleFutureAction` 
or `ComplexFutureAction`. The fact that the latter two classes exist at all is 
an implementation detail. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155967768
  
Well, the tests that randomly fail passed this time. Yay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155995354
  
Build failed again, in a test that has nothing to do with this PR. Those 
tests seem extremely flaky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44603713
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +119,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): 
SimpleFutureAction.this.type = {
-if (!atMost.isFinite()) {
-  awaitResult()
-} else jobWaiter.synchronized {
-  val finishTime = System.currentTimeMillis() + atMost.toMillis
-  while (!isCompleted) {
-val time = System.currentTimeMillis()
-if (time >= finishTime) {
-  throw new TimeoutException
-} else {
-  jobWaiter.wait(finishTime - time)
-}
-  }
-}
+jobWaiter.completionFuture.ready(atMost)
 this
   }
 
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T = {
-ready(atMost)(permit)
-awaitResult() match {
-  case scala.util.Success(res) => res
-  case scala.util.Failure(e) => throw e
-}
+jobWaiter.completionFuture.ready(atMost)
+value.get.get
--- End diff --

Very well.  I'll add an assertion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155945709
  
The Jenkins build has been failing due to ForkErrors being thrown by random 
tests that have nothing to do with anything I changed.  The error itself has no 
message attached,  so I have no idea what its problem is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44576601
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---
@@ -49,29 +51,18 @@ private[spark] class JobWaiter[T](
 dagScheduler.cancelJob(jobId)
   }
 
-  override def taskSucceeded(index: Int, result: Any): Unit = synchronized 
{
-if (_jobFinished) {
-  throw new UnsupportedOperationException("taskSucceeded() called on a 
finished JobWaiter")
-}
-resultHandler(index, result.asInstanceOf[T])
-finishedTasks += 1
-if (finishedTasks == totalTasks) {
-  _jobFinished = true
-  jobResult = JobSucceeded
-  this.notifyAll()
+  override def taskSucceeded(index: Int, result: Any): Unit = {
+/*
+  The resultHandler call must be synchronized in case resultHandler 
itself
+  is not thread safe.
+ */
+synchronized(resultHandler(index, result.asInstanceOf[T]))
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44576577
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---
@@ -28,17 +32,15 @@ private[spark] class JobWaiter[T](
 resultHandler: (Int, T) => Unit)
   extends JobListener {
 
-  private var finishedTasks = 0
-
-  // Is the job as a whole finished (succeeded or failed)?
-  @volatile
-  private var _jobFinished = totalTasks == 0
-
-  def jobFinished: Boolean = _jobFinished
-
+  private val finishedTasks = new AtomicInteger(0)
   // If the job is finished, this will be its result. In the case of 0 
task jobs (e.g. zero
   // partition RDDs), we set the jobResult directly to JobSucceeded.
-  private var jobResult: JobResult = if (jobFinished) JobSucceeded else 
null
+  private val jobPromise : Promise[Unit] =
+if (totalTasks == 0) Promise.successful(()) else Promise()
+
+  def jobFinished: Boolean = jobPromise.isCompleted
+
+  def completionFuture : Future[Unit] = jobPromise.future
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44568209
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -66,14 +65,22 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) 
extends Serializable with Loggi
*/
   def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
 val f = new ComplexFutureAction[Seq[T]]
-
-f.run {
-  // This is a blocking action so we should use 
"AsyncRDDActions.futureExecutionContext" which
-  // is a cached thread pool.
-  val results = new ArrayBuffer[T](num)
-  val totalParts = self.partitions.length
-  var partsScanned = 0
-  while (results.size < num && partsScanned < totalParts) {
+// Cached thread pool to handle aggregation of subtasks.
+implicit val executionContext = AsyncRDDActions.futureExecutionContext
+val results = new ArrayBuffer[T](num)
+val totalParts = self.partitions.length
+
+/*
+  Recursively triggers jobs to scan partitions until either the 
requested
+  number of elements are retrieved, or the partitions to scan are 
exhausted.
+  This implementation is non-blocking, asynchronously handling the
+  results of each job and triggering the next job using callbacks on 
futures.
+ */
+def continue(partsScanned : Int) : Future[Seq[T]] =
--- End diff --

Such as? Most of the body of this function was existing code that I just 
moved into here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44570490
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -20,13 +20,16 @@ package org.apache.spark
 import java.util.Collections
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.api.java.JavaFutureAction
-import org.apache.spark.rdd.RDD
-import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
+import org.apache.spark.annotation.DeveloperApi
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44568927
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +119,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): 
SimpleFutureAction.this.type = {
-if (!atMost.isFinite()) {
-  awaitResult()
-} else jobWaiter.synchronized {
-  val finishTime = System.currentTimeMillis() + atMost.toMillis
-  while (!isCompleted) {
-val time = System.currentTimeMillis()
-if (time >= finishTime) {
-  throw new TimeoutException
-} else {
-  jobWaiter.wait(finishTime - time)
-}
-  }
-}
+jobWaiter.completionFuture.ready(atMost)
 this
   }
 
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T = {
-ready(atMost)(permit)
-awaitResult() match {
-  case scala.util.Success(res) => res
-  case scala.util.Failure(e) => throw e
-}
+jobWaiter.completionFuture.ready(atMost)
--- End diff --

I don't think so. `Future.ready` takes an implicit `CanAwait` as a 
parameter, and in this case we're using the `CanAwait` passed in via the 
enclosing call. `Await.ready` would completely ignore the passed-in `CanAwait` 
and pass its own. The whole point of `CanAwait` is to verify that the developer 
explicitly intended to block. At this point in the code, that verification has 
already been done (by virtue of the `CanAwait` from the enclosing call).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44570514
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---
@@ -95,19 +102,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) 
extends Serializable with Loggi
 val p = partsScanned until math.min(partsScanned + numPartsToTry, 
totalParts)
 
 val buf = new Array[Array[T]](p.size)
-f.runJob(self,
+val job = f.runJob(self,
   (it: Iterator[T]) => it.take(left).toArray,
   p,
   (index: Int, data: Array[T]) => buf(index) = data,
   Unit)
-
-buf.foreach(results ++= _.take(num - results.size))
-partsScanned += numPartsToTry
+job flatMap {case _ =>
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44573118
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---
@@ -49,29 +51,14 @@ private[spark] class JobWaiter[T](
 dagScheduler.cancelJob(jobId)
   }
 
-  override def taskSucceeded(index: Int, result: Any): Unit = synchronized 
{
-if (_jobFinished) {
-  throw new UnsupportedOperationException("taskSucceeded() called on a 
finished JobWaiter")
-}
+  override def taskSucceeded(index: Int, result: Any): Unit = {
--- End diff --

Hmm... I see what you mean. While nothing within `JobWaiter` itself needs 
protection here, in probably all cases `resultHandler` is writing to an array, 
which seems like it should it should be okay since each thread would be writing 
to a different index in the array, but in practice probably isn't due to 
funkiness in the Java memory model. At the very least, synchronization is 
necessary to ensure that all writes have been flushed before returning the 
final result back from the various incarnations of `runJob`. I'll wrap the 
`resultHandler` call in a `synchronized` block and add a comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44577065
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -229,28 +183,15 @@ class ComplexFutureAction[T] extends FutureAction[T] {
   processPartition: Iterator[T] => U,
   partitions: Seq[Int],
   resultHandler: (Int, U) => Unit,
-  resultFunc: => R) {
+  resultFunc: => R) : FutureAction[R] = synchronized {
--- End diff --

Should we rename it to `submitJob`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155894205
  
@andrewor14  Thanks. I've fixed the issues you pointed out and anything 
else I could find. See my question above regarding `resultFunc`, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44570092
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +119,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): 
SimpleFutureAction.this.type = {
-if (!atMost.isFinite()) {
-  awaitResult()
-} else jobWaiter.synchronized {
-  val finishTime = System.currentTimeMillis() + atMost.toMillis
-  while (!isCompleted) {
-val time = System.currentTimeMillis()
-if (time >= finishTime) {
-  throw new TimeoutException
-} else {
-  jobWaiter.wait(finishTime - time)
-}
-  }
-}
+jobWaiter.completionFuture.ready(atMost)
 this
   }
 
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T = {
-ready(atMost)(permit)
-awaitResult() match {
-  case scala.util.Success(res) => res
-  case scala.util.Failure(e) => throw e
-}
+jobWaiter.completionFuture.ready(atMost)
+value.get.get
--- End diff --

That seems sort of redundant, given that `value.get` will throw an 
exception if the future somehow isn't completed (which anyway isn't possible on 
the line immediately after waiting for the future to complete).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155877448
  
It seems like Jenkins is not in a happy place:

```
 > git fetch --tags --progress https://github.com/apache/spark.git 
+refs/pull/9264/*:refs/remotes/origin/pr/9264/* # timeout=15
ERROR: Timeout after 15 minutes
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/spark.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:763)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1012)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1043)
at hudson.scm.SCM.checkout(SCM.java:485)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1275)
at 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44574415
  
--- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala ---
@@ -116,57 +120,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): 
SimpleFutureAction.this.type = {
-if (!atMost.isFinite()) {
-  awaitResult()
-} else jobWaiter.synchronized {
-  val finishTime = System.currentTimeMillis() + atMost.toMillis
-  while (!isCompleted) {
-val time = System.currentTimeMillis()
-if (time >= finishTime) {
-  throw new TimeoutException
-} else {
-  jobWaiter.wait(finishTime - time)
-}
-  }
-}
+jobWaiter.completionFuture.ready(atMost)
 this
   }
 
   @throws(classOf[Exception])
   override def result(atMost: Duration)(implicit permit: CanAwait): T = {
-ready(atMost)(permit)
-awaitResult() match {
-  case scala.util.Success(res) => res
-  case scala.util.Failure(e) => throw e
-}
+jobWaiter.completionFuture.ready(atMost)
+value.get.get
   }
 
   override def onComplete[U](func: (Try[T]) => U)(implicit executor: 
ExecutionContext) {
-executor.execute(new Runnable {
-  override def run() {
-func(awaitResult())
-  }
-})
+jobWaiter.completionFuture onComplete {_ => func(value.get)}
   }
 
   override def isCompleted: Boolean = jobWaiter.jobFinished
 
   override def isCancelled: Boolean = _cancelled
 
-  override def value: Option[Try[T]] = {
-if (jobWaiter.jobFinished) {
-  Some(awaitResult())
-} else {
-  None
-}
-  }
-
-  private def awaitResult(): Try[T] = {
-jobWaiter.awaitResult() match {
-  case JobSucceeded => scala.util.Success(resultFunc)
-  case JobFailed(e: Exception) => scala.util.Failure(e)
-}
-  }
+  override def value: Option[Try[T]] =
+jobWaiter.completionFuture.value map {res => res map {_ => resultFunc}}
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-11 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44576009
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---
@@ -49,29 +51,14 @@ private[spark] class JobWaiter[T](
 dagScheduler.cancelJob(jobId)
   }
 
-  override def taskSucceeded(index: Int, result: Any): Unit = synchronized 
{
-if (_jobFinished) {
-  throw new UnsupportedOperationException("taskSucceeded() called on a 
finished JobWaiter")
-}
+  override def taskSucceeded(index: Int, result: Any): Unit = {
--- End diff --

I wonder... do we need to also put some sort of synchronization around 
`resultFunc` in `SimpleFutureAction` to ensure that the thread that reads the 
result can see all the writes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-10 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155642041
  
Why do we care about binary compatibility for an internal API that is only 
used in one place?
In any case, the implicit ExecutionContext being passed to 
`ComplexFutureAction.runJob` apparently wasn't being used. I don't really know 
what to do about the removed `Thread` field, since it's not needed and it's not 
a public API. `SimpleFutureAction` and `ComplexFutureAction` really ought to be 
marked `private[spark]` anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-09 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155293780
  
@squito I merged with master 2 days ago, and more conflicts were introduced 
on master less than 24 hours later. I'll merge one more time in order to fix 
the issue with the accidentally-deleted Utils class, but I won't do it again 
after that until it's the only thing left to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-09 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155299515
  
Back up to date with master. All style issues should be resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-09 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-155308047
  
"[info] spark-mllib: found 0 potential binary incompatibilities (filtered 
64)
java.lang.RuntimeException: spark-core: Binary compatibility check failed!" 
 uh wat?!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-09 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44371154
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---
@@ -197,4 +197,30 @@ class AsyncRDDActionsSuite extends SparkFunSuite with 
BeforeAndAfterAll with Tim
   Await.result(f, Duration(20, "milliseconds"))
 }
   }
+
+  test("SimpleFutureAction callback must not consume a thread while 
waiting") {
+val executorInvoked = Promise[Unit]
+val fakeExecutionContext = new ExecutionContext {
+  override def execute(runnable: Runnable): Unit = {
+executorInvoked.success(())
+  }
+  override def reportFailure(t: Throwable): Unit = ???
+}
+val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => 
{Thread.sleep(1000L); itr}).countAsync()
+f.onComplete(_ => ())(fakeExecutionContext)
+assert(!executorInvoked.isCompleted)
--- End diff --

I tried two other approaches (one using semaphores, one using actors) to 
come up with a "non-flaky" solution, but neither worked because the tasks get 
serialized and deserialized, even when running in local mode. It seems that 
"Thread.sleep" is the only viable approach, as ugly as it is. The async action 
should just go away after the job completes. I'll add a comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...

2015-11-09 Thread reggert
Github user reggert commented on a diff in the pull request:

https://github.com/apache/spark/pull/9264#discussion_r44370757
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
 ---
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.aggregate
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, 
LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.types.{IntegerType, StructType, MapType, 
ArrayType}
-
-/**
- * Utility functions used by the query planner to convert our plan to new 
aggregation code path.
- */
-object Utils {
--- End diff --

Git somehow managed to screw up the merge when I re-sync'd with the 
upstream master. I'll resurrect it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11296] [SPARK-9026] Modifications to Jo...

2015-11-08 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-154877812
  
What else needs to be done to get this PR merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11296] [SPARK-9026] Modifications to Jo...

2015-11-07 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-154751770
  
I've added some unit tests that verify that my changes, do, indeed, stop 
callbacks from consuming threads before the jobs have completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11296] Modifications to JobWaiter, Futu...

2015-10-24 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-150851578
  
I agree that the JIRA issue is a duplicate. Odd that the old one didn't 
turn up for it when I searched for existing issues. I've linked the issues in 
JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11296] Modifications to JobWaiter, Futu...

2015-10-24 Thread reggert
Github user reggert commented on the pull request:

https://github.com/apache/spark/pull/9264#issuecomment-150851994
  
It looks like our approaches are very similar, though my changes to 
`JobWaiter` and `SimpleFutureAction` are a bit smaller, and I also included 
fixes for `ComplexFutureAction` and `AsyncRDDFunctions.takeAsync`. I haven't 
been able to fully test mine yet, though, because doing a full build of Spark 
causes my junky old laptop to go into seizures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11296] Modifications to JobWaiter, Futu...

2015-10-24 Thread reggert
GitHub user reggert opened a pull request:

https://github.com/apache/spark/pull/9264

[SPARK-11296] Modifications to JobWaiter, FutureAction, and AsyncRDDActions 
to support non-blocking operation

These changes rework the implementations of SimpleFutureAction, 
ComplexFutureAction, JobWaiter, and AsyncRDDActions such that asynchronous 
callbacks on the generated Futures NEVER block waiting for a job to complete. A 
small amount of mutex synchronization is necessary to protect the internal 
fields that manage cancellation, but these locks are only held very briefly and 
in practice should almost never cause any blocking to occur. The existing 
blocking APIs of these classes are retained, but they simply delegate to the 
underlying non-blocking API and `Await` the results with indefinite timeouts.

Associated JIRA ticket: https://issues.apache.org/jira/browse/SPARK-11296

This pull request contains all my own original work, which I release to the 
Spark project under its open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/reggert/spark fix-futureaction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9264.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9264


commit 4210aa6dc4d837988f101689bab6117404c00eb2
Author: Richard W. Eggert II <richard.egg...@gmail.com>
Date:   2015-10-24T02:30:18Z

reworked FutureAction and JobWaiter to avoid blocking or consuming threads 
while waiting for jobs

commit 1ad1abdd995a6347b2a18a31260168a987fee06b
Author: Richard W. Eggert II <richard.egg...@gmail.com>
Date:   2015-10-24T16:17:09Z

reworked ComplexFutureAction and AsyncRDDActions.takeAsync to be 
non-blocking




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org