[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on the issue: https://github.com/apache/spark/pull/15899 Strictly speaking, this doesn't just affect pair RDDs. It affects any RDDs on which a `for` expression involving a filter operation, which includes explicit `if` clauses as well as pattern matches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on the issue: https://github.com/apache/spark/pull/15899 The (k,v) <- pairRDD expression involves a pattern match, which the compiler converts into a filter/withFilter call on items that match the pattern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on the issue: https://github.com/apache/spark/pull/15899 I don't get why you say that it "doesn't even work in general". Under what circumstances doesn't it work? I've never run into any problems with it. The "simple syntactic sugar" allows very clear, concise code to be written in many cases, and even lets you take advantage of Scala pattern matching for filtering. For example: ```scala val strings = sparkContext.parallelize(List("1213,999", "abc", "456,789")) val NumberPairString = """(\d{1,5}),(\d{1,5})""".r val numbers = for (NumberPairString(a, b) <- strings; n <- Seq(a, b)) yield n.toInt // numbers.collect() yields Array[Int](1213, 999, 456, 789) ``` Without the `for` comprehension, you wind up with this significantly uglier and somewhat confusing chain of calls: ```scala val strings = sparkContext.parallelize(List("1213,999", "abc", "456,789")) val NumberPairString = """(\d{1,5}),(\d{1,5})""".r val numbers = strings.filter { case NumberPairString(_, _) => true case _ => false }.flatMap{ case NumberString(a, b) => Seq(a, b) }.map(_.toInt) // numbers.collect() yields Array[Int](1213, 999, 456, 789) ``` There are alternate ways to write this (e.g., with a single `flatMap` on a pattern match function that returns either 0 or 2 elements), but none of them are as clean and concise as the `for` version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on the issue: https://github.com/apache/spark/pull/15899 I disagree strongly. I've used RDDs in for comprehensions for almost 2 years without issue. Being able to "extend for loops" in this way is a major language feature of Scala that helps in writing expressive code. Refusing to support it would be considered surprising and irritating to most Scala developers. Rich On Nov 17, 2016 4:34 PM, "Reynold Xin" <notificati...@github.com> wrote: > @reggert <https://github.com/reggert> @srowen <https://github.com/srowen> > any reaction to my pushback? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/15899#issuecomment-261376699>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AB4WqZRI4Sd1rBMPOhtClYzySSKmoWegks5q_MhygaJpZM4Kzavq> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on the issue: https://github.com/apache/spark/pull/15899 The only other weird case I've run into is trying to `flatMap` across multiple RDDs, e.g., `for (x <- rdd1; y <- rdd2) yield x + y`, but it simply won't compile because `RDD.flatMap` doesn't support it (for good reason). You _can_ do it for an RDD and a regular collection, e.g., `for (x <- rdd; y <- 1 to 10) yield x + y`, which is a perfectly reasonable use case. Other than that, the only issues that I'm aware of are ones that would also affect regular (desugared) chained method calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/15899#discussion_r88267555 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -70,6 +70,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!nums.isEmpty()) assert(nums.max() === 4) assert(nums.min() === 1) +assert((for (n <- nums if n > 2) yield n).collect().toList === List(3, 4)) --- End diff -- According to IntelliJ, the `for` comprehension desugars into ```scala nums.withFilter(n => n > 2).map(n => n) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15899: [SPARK-18466] added withFilter method to RDD
Github user reggert commented on the issue: https://github.com/apache/spark/pull/15899 Using RDD's in `for` comprehensions dates back _at least_ to the examples given in the old 2010 AMPLab paper "Spark: Cluster Computing with Working Sets". `for` comprehensions are in no way limited to local collections. They provide syntactic sugar that, for many use cases, makes the code easier to comprehend than a series of chained method calls. Other examples of using them for non-collections include the Scala `Future` class as well as query construction using the SLICK database access library. I agree that a similar change should be made for other abstractions built on top of RDD's, such as `DataSet`s and `DStream`s, though. I'll add those when I get a chance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15899: [SPARK-18466] added withFilter method to RDD
GitHub user reggert opened a pull request: https://github.com/apache/spark/pull/15899 [SPARK-18466] added withFilter method to RDD ## What changes were proposed in this pull request? A `withFilter` method has been added to `RDD` as an alias for the `filter` method. When using `for` comprehensions, the Scala compiler prefers (and as of 2.12, _requires_) the lazy `withFilter` method, only falling back to using the `filter` method (which, for regular collections, is non-lazy, but for RDDs is lazy). Prior to Scala 2.12, this fallback causes the compiler to emit a warning, and as of Scala 2.12, it results in an error. ## How was this patch tested? `RDDSuite` was updated by adding a line to "basic operations" that duplicates the behavior of the `filter` test, but uses a `for` comprehension instead of a direct method call. You can merge this pull request into a Git repository by running: $ git pull https://github.com/reggert/spark feature/withfilter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15899.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15899 commit db9cf5a9627755e1c8e212b7f29d54dc8bed1c54 Author: Richard W. Eggert II <richard.egg...@gmail.com> Date: 2016-11-16T05:52:27Z [SPARK-18466] added withFilter method to RDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-164967910 @andrewor14 I included a couple lines to propagate the local properties (from the thread that created the `ComplexFutureAction`) to each of the spawned jobs (all but the first of which may be launched from other threads, losing the local properties unless `setLocalProperties` is called). AFAIK this resolves SPARK-4514. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r47446916 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { +val executionContextInvoked = Promise[Unit] +val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { +executionContextInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = () +} +val starter = Smuggle(new Semaphore(0)) +starter.drainPermits() +val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => starter.acquire(1); itr} +val f = action(rdd) +f.onComplete(_ => ())(fakeExecutionContext) +// Here we verify that registering the callback didn't cause a thread to be consumed. +assert(!executionContextInvoked.isCompleted) +// Now allow the executors to proceed with task processing. +starter.release(rdd.partitions.length) +// Waiting for the result verifies that the tasks were successfully processed. +// This mainly exists to verify that we didn't break task deserialization. --- End diff -- Oh, is that all? :-) Comment line removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r47322111 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { +val executionContextInvoked = Promise[Unit] +val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { +executionContextInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = () +} +val starter = Smuggle(new Semaphore(0)) +starter.drainPermits() +val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => starter.acquire(1); itr} +val f = action(rdd) +f.onComplete(_ => ())(fakeExecutionContext) +// Here we verify that registering the callback didn't cause a thread to be consumed. +assert(!executionContextInvoked.isCompleted) +// Now allow the executors to proceed with task processing. +starter.release(rdd.partitions.length) +// Waiting for the result verifies that the tasks were successfully processed. +// This mainly exists to verify that we didn't break task deserialization. --- End diff -- If `mapPartitions` throws an exception, won't that cause the test to fail before we even get to `Await.result`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r47117656 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { +val executionContextInvoked = Promise[Unit] +val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { +executionContextInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = () +} +val starter = Smuggle(new Semaphore(0)) +starter.drainPermits() +val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => starter.acquire(1); itr} +val f = action(rdd) +f.onComplete(_ => ())(fakeExecutionContext) +// Here we verify that registering the callback didn't cause a thread to be consumed. +assert(!executionContextInvoked.isCompleted) +// Now allow the executors to proceed with task processing. +starter.release(rdd.partitions.length) +// Waiting for the result verifies that the tasks were successfully processed. +// This mainly exists to verify that we didn't break task deserialization. --- End diff -- To clarify, in the event that we've broken task deserialization, I would expect to see a `NotSerializableException` or similar error thrown from the `Await.result` call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r46909976 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,34 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { +val executionContextInvoked = Promise[Unit] +val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { +executionContextInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = () +} +val starter = Smuggle(new Semaphore(0)) +starter.drainPermits() +val rdd = sc.parallelize(1 to 100, 4).mapPartitions {itr => starter.acquire(1); itr} +val f = action(rdd) +f.onComplete(_ => ())(fakeExecutionContext) +// Here we verify that registering the callback didn't cause a thread to be consumed. +assert(!executionContextInvoked.isCompleted) +// Now allow the executors to proceed with task processing. +starter.release(rdd.partitions.length) +// Waiting for the result verifies that the tasks were successfully processed. +// This mainly exists to verify that we didn't break task deserialization. --- End diff -- Unless I'm mistaken, the `Await.result` call will throw an exception if the job fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-162326651 All I want for Christmas is a code review. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-161528329 @zsxwing @JoshRosen I just want to make sure that you guys haven't forgotten about this. I haven't heard anything in a week and a half. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] [SPARK-4514] Modifications to Job...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-160829564 Any feedback? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-158783297 SparkQA seems to be happy now. @JoshRosen What else needs to be done to move forward on this? The current implementation on master requires a dedicated thread pool for all callbacks on FutureActions to avoid depleting the global ExecutionContext (and this fact is entirely undocumented), so it would be good to have these changes in 1.6. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-158798727 It appears that this PR may also fix SPARK-4514. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-158799901 Correction: This PR only fixes SPARK-4514 as long as the {{ComplexFutureAction}} only spawns a single job. The job group information is lost in subsequent jobs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-158800472 I've implemented a two-line fix for SPARK-4514. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-158680579 I've come up with a reusable way to make use of semaphores to control timing of tasks during unit tests. Please see the `Smuggle` class and let me know what you think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-158609528 Can someone re-run the build? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-157972418 Any thoughts on the latest changes (besides the merge)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-157254101 I took a stab at implementing the above refactoring, and it came out looking pretty nice, so I went ahead and committed it. Please take a look and let me know if you have any problems with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-157295993 I most certainly did not add all those classes (other than `JobSubmitter`)! @SparkQA is confused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-156844499 @srowen Okay. I was just worried I was going to get blamed for breaking something. ;-) What's left to do here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-156868899 I'm not particularly happy about the (internal) API exposed by `ComplexFutureAction`. Having callers instantiate the action and then mutate it by calling the `run` and `submitJob` methods just seems sloppy and error-prone. I think I would prefer that instead of having a `run` method that takes a closure returning `Future[T]`, `ComplexFutureAction` should accept a constructor parameter of type `JobSubmitter => Future[T]`, where `JobSubmitter` would be a (Spark-private) trait providing the `submitJob` method. This would make `ComplexFutureAction` more or less immutable after construction (except for cancellation) and prevent someone from calling `run` or `submitJob` from outside Spark and making a mess of things. This is a fairly major changes introducing a new trait, so I will hold off implementing it until I get some positive feedback about it. Additionally, it seems like certain common aspects of `SimpleFutureAction` and `ComplexFutureAction` (such as the `_cancellation` field and the base implementation of `cancel`) could be pulled out into an implementation trait (i.e., `FutureActionLike`) to avoid duplicating code. Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44879232 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark._ --- End diff -- The line needs to change regardless, because an import was added. Explicitly specifying 5 imported classes causes the line to exceed 100 characters, however. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864546 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -95,19 +102,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val buf = new Array[Array[T]](p.size) -f.runJob(self, +val job = f.submitJob(self, (it: Iterator[T]) => it.take(left).toArray, p, (index: Int, data: Array[T]) => buf(index) = data, Unit) - -buf.foreach(results ++= _.take(num - results.size)) -partsScanned += numPartsToTry +job.flatMap {case _ => --- End diff -- Removed the `case`, but retained the `_`, as I think it makes it clear that we're not using the argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864566 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -276,10 +219,11 @@ class ComplexFutureAction[T] extends FutureAction[T] { override def value: Option[Try[T]] = p.future.value - def jobIds: Seq[Int] = jobs + def jobIds: Seq[Int] = subActions flatMap {_.jobIds} --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864560 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -66,14 +65,22 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] - -f.run { - // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which - // is a cached thread pool. - val results = new ArrayBuffer[T](num) - val totalParts = self.partitions.length - var partsScanned = 0 - while (results.size < num && partsScanned < totalParts) { +// Cached thread pool to handle aggregation of subtasks. +implicit val executionContext = AsyncRDDActions.futureExecutionContext +val results = new ArrayBuffer[T](num) +val totalParts = self.partitions.length + +/* + Recursively triggers jobs to scan partitions until either the requested + number of elements are retrieved, or the partitions to scan are exhausted. + This implementation is non-blocking, asynchronously handling the + results of each job and triggering the next job using callbacks on futures. + */ +def continue(partsScanned : Int) : Future[Seq[T]] = + if (results.size >= num || partsScanned >= totalParts) { +Future.successful(results.toSeq) + } --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864605 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ +@DeveloperApi class ComplexFutureAction[T] extends FutureAction[T] { - // Pointer to the thread that is executing the action. It is set when the action is run. - @volatile private var thread: Thread = _ + @volatile private var _cancelled = false - // A flag indicating whether the future has been cancelled. This is used in case the future - // is cancelled before the action was even run (and thus we have no thread to interrupt). - @volatile private var _cancelled: Boolean = false - - @volatile private var jobs: Seq[Int] = Nil + @volatile private var subActions: List[FutureAction[_]] = Nil // A promise used to signal the future. - private val p = promise[T]() + private val p = Promise[T]() - override def cancel(): Unit = this.synchronized { + override def cancel(): Unit = synchronized { _cancelled = true -if (thread != null) { - thread.interrupt() -} +p.tryFailure(new SparkException("Action has been cancelled")) +subActions.foreach {_.cancel()} --- End diff -- I personally like the curly braces because it makes it clear that it's a code block, but I'll change it anyway, because IntelliJ tends to get a little trigger-happy with auto-formatting when it sees them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864614 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ +@DeveloperApi class ComplexFutureAction[T] extends FutureAction[T] { - // Pointer to the thread that is executing the action. It is set when the action is run. - @volatile private var thread: Thread = _ + @volatile private var _cancelled = false - // A flag indicating whether the future has been cancelled. This is used in case the future - // is cancelled before the action was even run (and thus we have no thread to interrupt). - @volatile private var _cancelled: Boolean = false - - @volatile private var jobs: Seq[Int] = Nil + @volatile private var subActions: List[FutureAction[_]] = Nil // A promise used to signal the future. - private val p = promise[T]() + private val p = Promise[T]() - override def cancel(): Unit = this.synchronized { + override def cancel(): Unit = synchronized { _cancelled = true -if (thread != null) { - thread.interrupt() -} +p.tryFailure(new SparkException("Action has been cancelled")) +subActions.foreach {_.cancel()} } /** * Executes some action enclosed in the closure. To properly enable cancellation, the closure * should use runJob implementation in this promise. See takeAsync for example. */ - def run(func: => T)(implicit executor: ExecutionContext): this.type = { -scala.concurrent.future { - thread = Thread.currentThread - try { -p.success(func) - } catch { -case e: Exception => p.failure(e) - } finally { -// This lock guarantees when calling `thread.interrupt()` in `cancel`, -// thread won't be set to null. -ComplexFutureAction.this.synchronized { - thread = null -} - } -} + def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { +p.tryCompleteWith(func) this } /** - * Runs a Spark job. This is a wrapper around the same functionality provided by SparkContext + * Submit a job for execution and return a FutureAction holding the result. + * This is a wrapper around the same functionality provided by SparkContext * to enable cancellation. */ - def runJob[T, U, R]( + def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) { + resultFunc: => R): FutureAction[R] = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. -val job = this.synchronized { - if (!isCancelled) { -rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) - } else { -throw new SparkException("Action has been cancelled") - } -} - -this.jobs = jobs ++ job.jobIds - -// Wait for the job to complete. If the action is cancelled (with an interrupt), -// cancel the job and stop the execution. This is not in a synchronized block because -// Await.ready eventually waits on the monitor in FutureJob.jobWaiter. -try { - Await.ready(job, Duration.Inf) -} catch { - case e: InterruptedException => -job.cancel() -throw new SparkException("Action has been cancelled") +if (!isCancelled) { + val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) + subActions = job::subActions --- End diff -- I don't think I've ever seen it done that way, and ScalaStyle doesn't seem to care. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864643 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark._ --- End diff -- I think IntelliJ did that automatically because the number of things being imported exceeded some threshold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864625 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -116,57 +119,27 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { -if (!atMost.isFinite()) { - awaitResult() -} else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { -val time = System.currentTimeMillis() -if (time >= finishTime) { - throw new TimeoutException -} else { - jobWaiter.wait(finishTime - time) -} - } -} +jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { -ready(atMost)(permit) -awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e -} +jobWaiter.completionFuture.ready(atMost) +assert(value.isDefined, "Future has not completed properly") +value.get.get } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { -executor.execute(new Runnable { - override def run() { -func(awaitResult()) - } -}) +jobWaiter.completionFuture onComplete {_ => func(value.get)} } override def isCompleted: Boolean = jobWaiter.jobFinished override def isCancelled: Boolean = _cancelled - override def value: Option[Try[T]] = { -if (jobWaiter.jobFinished) { - Some(awaitResult()) -} else { - None -} - } - - private def awaitResult(): Try[T] = { -jobWaiter.awaitResult() match { - case JobSucceeded => scala.util.Success(resultFunc) - case JobFailed(e: Exception) => scala.util.Failure(e) -} - } + override def value: Option[Try[T]] = +jobWaiter.completionFuture.value.map {res => res.map {_ => resultFunc}} --- End diff -- Does not compile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864655 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -116,57 +119,27 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { -if (!atMost.isFinite()) { - awaitResult() -} else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { -val time = System.currentTimeMillis() -if (time >= finishTime) { - throw new TimeoutException -} else { - jobWaiter.wait(finishTime - time) -} - } -} +jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { -ready(atMost)(permit) -awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e -} +jobWaiter.completionFuture.ready(atMost) +assert(value.isDefined, "Future has not completed properly") +value.get.get } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { -executor.execute(new Runnable { - override def run() { -func(awaitResult()) - } -}) +jobWaiter.completionFuture onComplete {_ => func(value.get)} } override def isCompleted: Boolean = jobWaiter.jobFinished override def isCancelled: Boolean = _cancelled - override def value: Option[Try[T]] = { -if (jobWaiter.jobFinished) { - Some(awaitResult()) -} else { - None -} - } - - private def awaitResult(): Try[T] = { -jobWaiter.awaitResult() match { - case JobSucceeded => scala.util.Success(resultFunc) - case JobFailed(e: Exception) => scala.util.Failure(e) -} - } + override def value: Option[Try[T]] = +jobWaiter.completionFuture.value.map {res => res.map {_ => resultFunc}} --- End diff -- I've replaced the curly braces with parentheses, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864690 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,50 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]) +(starter: => Semaphore) : Unit = { --- End diff -- Keeping it this way keeps the call sites a lot less cluttered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864838 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ +@DeveloperApi class ComplexFutureAction[T] extends FutureAction[T] { - // Pointer to the thread that is executing the action. It is set when the action is run. - @volatile private var thread: Thread = _ + @volatile private var _cancelled = false - // A flag indicating whether the future has been cancelled. This is used in case the future - // is cancelled before the action was even run (and thus we have no thread to interrupt). - @volatile private var _cancelled: Boolean = false - - @volatile private var jobs: Seq[Int] = Nil + @volatile private var subActions: List[FutureAction[_]] = Nil // A promise used to signal the future. - private val p = promise[T]() + private val p = Promise[T]() - override def cancel(): Unit = this.synchronized { + override def cancel(): Unit = synchronized { _cancelled = true -if (thread != null) { - thread.interrupt() -} +p.tryFailure(new SparkException("Action has been cancelled")) +subActions.foreach {_.cancel()} } /** * Executes some action enclosed in the closure. To properly enable cancellation, the closure * should use runJob implementation in this promise. See takeAsync for example. */ - def run(func: => T)(implicit executor: ExecutionContext): this.type = { -scala.concurrent.future { - thread = Thread.currentThread - try { -p.success(func) - } catch { -case e: Exception => p.failure(e) - } finally { -// This lock guarantees when calling `thread.interrupt()` in `cancel`, -// thread won't be set to null. -ComplexFutureAction.this.synchronized { - thread = null -} - } -} + def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { +p.tryCompleteWith(func) this } /** - * Runs a Spark job. This is a wrapper around the same functionality provided by SparkContext + * Submit a job for execution and return a FutureAction holding the result. + * This is a wrapper around the same functionality provided by SparkContext * to enable cancellation. */ - def runJob[T, U, R]( + def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) { + resultFunc: => R): FutureAction[R] = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. -val job = this.synchronized { - if (!isCancelled) { -rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) - } else { -throw new SparkException("Action has been cancelled") - } -} - -this.jobs = jobs ++ job.jobIds - -// Wait for the job to complete. If the action is cancelled (with an interrupt), -// cancel the job and stop the execution. This is not in a synchronized block because -// Await.ready eventually waits on the monitor in FutureJob.jobWaiter. -try { - Await.ready(job, Duration.Inf) -} catch { - case e: InterruptedException => -job.cancel() -throw new SparkException("Action has been cancelled") +if (!isCancelled) { + val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) + subActions = job::subActions --- End diff -- I actually haven't seen Spark code that uses `::` at all (not that it doesn't exist - I just haven't looked at any files that use it). My understanding is that using `::` as the "cons" operator is a notation inherited from ML. All of the (S)ML examples I have ever seen use it without any spaces, though it seems that Scala examples don't necessarily follow the same style convention. To me, using it without the spaces reads better, since it usually looks like a list of elements chained together, but it doesn't really matter to me either way. --- If your project is set up for it, you can reply to this email an
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44864860 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,50 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]) +(starter: => Semaphore) : Unit = { --- End diff -- To clarify: If I put the parameters in the same parameter list, the compiler forces me to be a lot more verbose at the call sites (with curly braces, `=>`, and so forth) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44865053 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -177,80 +150,50 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ +@DeveloperApi class ComplexFutureAction[T] extends FutureAction[T] { - // Pointer to the thread that is executing the action. It is set when the action is run. - @volatile private var thread: Thread = _ + @volatile private var _cancelled = false - // A flag indicating whether the future has been cancelled. This is used in case the future - // is cancelled before the action was even run (and thus we have no thread to interrupt). - @volatile private var _cancelled: Boolean = false - - @volatile private var jobs: Seq[Int] = Nil + @volatile private var subActions: List[FutureAction[_]] = Nil // A promise used to signal the future. - private val p = promise[T]() + private val p = Promise[T]() - override def cancel(): Unit = this.synchronized { + override def cancel(): Unit = synchronized { _cancelled = true -if (thread != null) { - thread.interrupt() -} +p.tryFailure(new SparkException("Action has been cancelled")) +subActions.foreach {_.cancel()} } /** * Executes some action enclosed in the closure. To properly enable cancellation, the closure * should use runJob implementation in this promise. See takeAsync for example. */ - def run(func: => T)(implicit executor: ExecutionContext): this.type = { -scala.concurrent.future { - thread = Thread.currentThread - try { -p.success(func) - } catch { -case e: Exception => p.failure(e) - } finally { -// This lock guarantees when calling `thread.interrupt()` in `cancel`, -// thread won't be set to null. -ComplexFutureAction.this.synchronized { - thread = null -} - } -} + def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { +p.tryCompleteWith(func) this } /** - * Runs a Spark job. This is a wrapper around the same functionality provided by SparkContext + * Submit a job for execution and return a FutureAction holding the result. + * This is a wrapper around the same functionality provided by SparkContext * to enable cancellation. */ - def runJob[T, U, R]( + def submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) { + resultFunc: => R): FutureAction[R] = synchronized { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. -val job = this.synchronized { - if (!isCancelled) { -rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) - } else { -throw new SparkException("Action has been cancelled") - } -} - -this.jobs = jobs ++ job.jobIds - -// Wait for the job to complete. If the action is cancelled (with an interrupt), -// cancel the job and stop the execution. This is not in a synchronized block because -// Await.ready eventually waits on the monitor in FutureJob.jobWaiter. -try { - Await.ready(job, Duration.Inf) -} catch { - case e: InterruptedException => -job.cancel() -throw new SparkException("Action has been cancelled") +if (!isCancelled) { + val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) + subActions = job::subActions --- End diff -- I searched for "ml cons operator" in Google, and out all the hits on the first page that included (ML, SML, or F#) code samples, all but one omitted the spaces. Anyway, I've inserted spaces for the sake of consistency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscr
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-156768512 It sure would be nice if Spark's unit tests were consistent. I add whitespace and suddenly the build fails? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-156769951 Unrelated to this PR (except that it may be partially responsible for the most recent test failure), there appears to be a race condition here: ```scala if (!executionContext.isShutdown) { val f = Future { deleteFiles() } ``` (from `FileBasedWriteAheadLog.clean` in spark-streaming) If the `ExecutionContext` shuts down after `isShutdown` is called but before the task underlying the `Future` is enqueued on it, an exception will be thrown, which appears to be what's happening in `CommonWriteAheadLogTests.logCleanUpTest`. I'm not certain why the `ExecutionContext` would be getting shut down, but it does seem like the test that's failing has an awfully short timeout, and according to the stack trace, the thread pool is very busy at the time of the failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-156295760 Is there anything left to be done before merging this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44564647 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,31 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + private def testAsyncAction[R](action : RDD[Int] => FutureAction[R]) : Unit = { +val executorInvoked = Promise[Unit] +val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { +executorInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = () +} +/* + We sleep here so that we get to the assertion before the job completes. + I wish there were a cleaner way to do this, but trying to use any sort of synchronization + with this fails due to task serialization. +*/ +val rdd = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}) --- End diff -- I just copied the style of the test appears immediately above this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44564235 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -20,13 +20,16 @@ package org.apache.spark import java.util.Collections import java.util.concurrent.TimeUnit -import org.apache.spark.api.java.JavaFutureAction -import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import org.apache.spark.annotation.DeveloperApi --- End diff -- I blame IntelliJ. Fixed. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44536897 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -105,6 +108,7 @@ trait FutureAction[T] extends Future[T] { * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ +@DeveloperApi --- End diff -- I agree. I was afraid someone would yell at me if I started changing classes from public to private, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44534066 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -105,6 +108,7 @@ trait FutureAction[T] extends Future[T] { * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ +@DeveloperApi --- End diff -- Why are we exposing these classes as "stable API"? All of the public methods of `AsyncRDDFunctions` return `FutureAction`, not `SimpleFutureAction` or `ComplexFutureAction`. The fact that the latter two classes exist at all is an implementation detail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155967768 Well, the tests that randomly fail passed this time. Yay? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155995354 Build failed again, in a test that has nothing to do with this PR. Those tests seem extremely flaky. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44603713 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -116,57 +119,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { -if (!atMost.isFinite()) { - awaitResult() -} else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { -val time = System.currentTimeMillis() -if (time >= finishTime) { - throw new TimeoutException -} else { - jobWaiter.wait(finishTime - time) -} - } -} +jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { -ready(atMost)(permit) -awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e -} +jobWaiter.completionFuture.ready(atMost) +value.get.get --- End diff -- Very well. I'll add an assertion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155945709 The Jenkins build has been failing due to ForkErrors being thrown by random tests that have nothing to do with anything I changed. The error itself has no message attached, so I have no idea what its problem is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44576601 --- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala --- @@ -49,29 +51,18 @@ private[spark] class JobWaiter[T]( dagScheduler.cancelJob(jobId) } - override def taskSucceeded(index: Int, result: Any): Unit = synchronized { -if (_jobFinished) { - throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") -} -resultHandler(index, result.asInstanceOf[T]) -finishedTasks += 1 -if (finishedTasks == totalTasks) { - _jobFinished = true - jobResult = JobSucceeded - this.notifyAll() + override def taskSucceeded(index: Int, result: Any): Unit = { +/* + The resultHandler call must be synchronized in case resultHandler itself + is not thread safe. + */ +synchronized(resultHandler(index, result.asInstanceOf[T])) --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44576577 --- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala --- @@ -28,17 +32,15 @@ private[spark] class JobWaiter[T]( resultHandler: (Int, T) => Unit) extends JobListener { - private var finishedTasks = 0 - - // Is the job as a whole finished (succeeded or failed)? - @volatile - private var _jobFinished = totalTasks == 0 - - def jobFinished: Boolean = _jobFinished - + private val finishedTasks = new AtomicInteger(0) // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero // partition RDDs), we set the jobResult directly to JobSucceeded. - private var jobResult: JobResult = if (jobFinished) JobSucceeded else null + private val jobPromise : Promise[Unit] = +if (totalTasks == 0) Promise.successful(()) else Promise() + + def jobFinished: Boolean = jobPromise.isCompleted + + def completionFuture : Future[Unit] = jobPromise.future --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44568209 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -66,14 +65,22 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] - -f.run { - // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which - // is a cached thread pool. - val results = new ArrayBuffer[T](num) - val totalParts = self.partitions.length - var partsScanned = 0 - while (results.size < num && partsScanned < totalParts) { +// Cached thread pool to handle aggregation of subtasks. +implicit val executionContext = AsyncRDDActions.futureExecutionContext +val results = new ArrayBuffer[T](num) +val totalParts = self.partitions.length + +/* + Recursively triggers jobs to scan partitions until either the requested + number of elements are retrieved, or the partitions to scan are exhausted. + This implementation is non-blocking, asynchronously handling the + results of each job and triggering the next job using callbacks on futures. + */ +def continue(partsScanned : Int) : Future[Seq[T]] = --- End diff -- Such as? Most of the body of this function was existing code that I just moved into here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44570490 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -20,13 +20,16 @@ package org.apache.spark import java.util.Collections import java.util.concurrent.TimeUnit -import org.apache.spark.api.java.JavaFutureAction -import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import org.apache.spark.annotation.DeveloperApi --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44568927 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -116,57 +119,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { -if (!atMost.isFinite()) { - awaitResult() -} else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { -val time = System.currentTimeMillis() -if (time >= finishTime) { - throw new TimeoutException -} else { - jobWaiter.wait(finishTime - time) -} - } -} +jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { -ready(atMost)(permit) -awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e -} +jobWaiter.completionFuture.ready(atMost) --- End diff -- I don't think so. `Future.ready` takes an implicit `CanAwait` as a parameter, and in this case we're using the `CanAwait` passed in via the enclosing call. `Await.ready` would completely ignore the passed-in `CanAwait` and pass its own. The whole point of `CanAwait` is to verify that the developer explicitly intended to block. At this point in the code, that verification has already been done (by virtue of the `CanAwait` from the enclosing call). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44570514 --- Diff: core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala --- @@ -95,19 +102,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val buf = new Array[Array[T]](p.size) -f.runJob(self, +val job = f.runJob(self, (it: Iterator[T]) => it.take(left).toArray, p, (index: Int, data: Array[T]) => buf(index) = data, Unit) - -buf.foreach(results ++= _.take(num - results.size)) -partsScanned += numPartsToTry +job flatMap {case _ => --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44573118 --- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala --- @@ -49,29 +51,14 @@ private[spark] class JobWaiter[T]( dagScheduler.cancelJob(jobId) } - override def taskSucceeded(index: Int, result: Any): Unit = synchronized { -if (_jobFinished) { - throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") -} + override def taskSucceeded(index: Int, result: Any): Unit = { --- End diff -- Hmm... I see what you mean. While nothing within `JobWaiter` itself needs protection here, in probably all cases `resultHandler` is writing to an array, which seems like it should it should be okay since each thread would be writing to a different index in the array, but in practice probably isn't due to funkiness in the Java memory model. At the very least, synchronization is necessary to ensure that all writes have been flushed before returning the final result back from the various incarnations of `runJob`. I'll wrap the `resultHandler` call in a `synchronized` block and add a comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44577065 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -229,28 +183,15 @@ class ComplexFutureAction[T] extends FutureAction[T] { processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R) { + resultFunc: => R) : FutureAction[R] = synchronized { --- End diff -- Should we rename it to `submitJob`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155894205 @andrewor14 Thanks. I've fixed the issues you pointed out and anything else I could find. See my question above regarding `resultFunc`, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44570092 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -116,57 +119,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { -if (!atMost.isFinite()) { - awaitResult() -} else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { -val time = System.currentTimeMillis() -if (time >= finishTime) { - throw new TimeoutException -} else { - jobWaiter.wait(finishTime - time) -} - } -} +jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { -ready(atMost)(permit) -awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e -} +jobWaiter.completionFuture.ready(atMost) +value.get.get --- End diff -- That seems sort of redundant, given that `value.get` will throw an exception if the future somehow isn't completed (which anyway isn't possible on the line immediately after waiting for the future to complete). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155877448 It seems like Jenkins is not in a happy place: ``` > git fetch --tags --progress https://github.com/apache/spark.git +refs/pull/9264/*:refs/remotes/origin/pr/9264/* # timeout=15 ERROR: Timeout after 15 minutes ERROR: Error fetching remote repo 'origin' hudson.plugins.git.GitException: Failed to fetch from https://github.com/apache/spark.git at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:763) at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1012) at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1043) at hudson.scm.SCM.checkout(SCM.java:485) at hudson.model.AbstractProject.checkout(AbstractProject.java:1275) at ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44574415 --- Diff: core/src/main/scala/org/apache/spark/FutureAction.scala --- @@ -116,57 +120,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { -if (!atMost.isFinite()) { - awaitResult() -} else jobWaiter.synchronized { - val finishTime = System.currentTimeMillis() + atMost.toMillis - while (!isCompleted) { -val time = System.currentTimeMillis() -if (time >= finishTime) { - throw new TimeoutException -} else { - jobWaiter.wait(finishTime - time) -} - } -} +jobWaiter.completionFuture.ready(atMost) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { -ready(atMost)(permit) -awaitResult() match { - case scala.util.Success(res) => res - case scala.util.Failure(e) => throw e -} +jobWaiter.completionFuture.ready(atMost) +value.get.get } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { -executor.execute(new Runnable { - override def run() { -func(awaitResult()) - } -}) +jobWaiter.completionFuture onComplete {_ => func(value.get)} } override def isCompleted: Boolean = jobWaiter.jobFinished override def isCancelled: Boolean = _cancelled - override def value: Option[Try[T]] = { -if (jobWaiter.jobFinished) { - Some(awaitResult()) -} else { - None -} - } - - private def awaitResult(): Try[T] = { -jobWaiter.awaitResult() match { - case JobSucceeded => scala.util.Success(resultFunc) - case JobFailed(e: Exception) => scala.util.Failure(e) -} - } + override def value: Option[Try[T]] = +jobWaiter.completionFuture.value map {res => res map {_ => resultFunc}} --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44576009 --- Diff: core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala --- @@ -49,29 +51,14 @@ private[spark] class JobWaiter[T]( dagScheduler.cancelJob(jobId) } - override def taskSucceeded(index: Int, result: Any): Unit = synchronized { -if (_jobFinished) { - throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") -} + override def taskSucceeded(index: Int, result: Any): Unit = { --- End diff -- I wonder... do we need to also put some sort of synchronization around `resultFunc` in `SimpleFutureAction` to ensure that the thread that reads the result can see all the writes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155642041 Why do we care about binary compatibility for an internal API that is only used in one place? In any case, the implicit ExecutionContext being passed to `ComplexFutureAction.runJob` apparently wasn't being used. I don't really know what to do about the removed `Thread` field, since it's not needed and it's not a public API. `SimpleFutureAction` and `ComplexFutureAction` really ought to be marked `private[spark]` anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155293780 @squito I merged with master 2 days ago, and more conflicts were introduced on master less than 24 hours later. I'll merge one more time in order to fix the issue with the accidentally-deleted Utils class, but I won't do it again after that until it's the only thing left to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155299515 Back up to date with master. All style issues should be resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-155308047 "[info] spark-mllib: found 0 potential binary incompatibilities (filtered 64) java.lang.RuntimeException: spark-core: Binary compatibility check failed!" uh wat?! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44371154 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,30 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + test("SimpleFutureAction callback must not consume a thread while waiting") { +val executorInvoked = Promise[Unit] +val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { +executorInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = ??? +} +val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).countAsync() +f.onComplete(_ => ())(fakeExecutionContext) +assert(!executorInvoked.isCompleted) --- End diff -- I tried two other approaches (one using semaphores, one using actors) to come up with a "non-flaky" solution, but neither worked because the tasks get serialized and deserialized, even when running in local mode. It seems that "Thread.sleep" is the only viable approach, as ugly as it is. The async action should just go away after the job completes. I'll add a comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9026] Modifications to JobWaiter, Futur...
Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44370757 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala --- @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.aggregate - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.{IntegerType, StructType, MapType, ArrayType} - -/** - * Utility functions used by the query planner to convert our plan to new aggregation code path. - */ -object Utils { --- End diff -- Git somehow managed to screw up the merge when I re-sync'd with the upstream master. I'll resurrect it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11296] [SPARK-9026] Modifications to Jo...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-154877812 What else needs to be done to get this PR merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11296] [SPARK-9026] Modifications to Jo...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-154751770 I've added some unit tests that verify that my changes, do, indeed, stop callbacks from consuming threads before the jobs have completed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11296] Modifications to JobWaiter, Futu...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-150851578 I agree that the JIRA issue is a duplicate. Odd that the old one didn't turn up for it when I searched for existing issues. I've linked the issues in JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11296] Modifications to JobWaiter, Futu...
Github user reggert commented on the pull request: https://github.com/apache/spark/pull/9264#issuecomment-150851994 It looks like our approaches are very similar, though my changes to `JobWaiter` and `SimpleFutureAction` are a bit smaller, and I also included fixes for `ComplexFutureAction` and `AsyncRDDFunctions.takeAsync`. I haven't been able to fully test mine yet, though, because doing a full build of Spark causes my junky old laptop to go into seizures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11296] Modifications to JobWaiter, Futu...
GitHub user reggert opened a pull request: https://github.com/apache/spark/pull/9264 [SPARK-11296] Modifications to JobWaiter, FutureAction, and AsyncRDDActions to support non-blocking operation These changes rework the implementations of SimpleFutureAction, ComplexFutureAction, JobWaiter, and AsyncRDDActions such that asynchronous callbacks on the generated Futures NEVER block waiting for a job to complete. A small amount of mutex synchronization is necessary to protect the internal fields that manage cancellation, but these locks are only held very briefly and in practice should almost never cause any blocking to occur. The existing blocking APIs of these classes are retained, but they simply delegate to the underlying non-blocking API and `Await` the results with indefinite timeouts. Associated JIRA ticket: https://issues.apache.org/jira/browse/SPARK-11296 This pull request contains all my own original work, which I release to the Spark project under its open source license. You can merge this pull request into a Git repository by running: $ git pull https://github.com/reggert/spark fix-futureaction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9264.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9264 commit 4210aa6dc4d837988f101689bab6117404c00eb2 Author: Richard W. Eggert II <richard.egg...@gmail.com> Date: 2015-10-24T02:30:18Z reworked FutureAction and JobWaiter to avoid blocking or consuming threads while waiting for jobs commit 1ad1abdd995a6347b2a18a31260168a987fee06b Author: Richard W. Eggert II <richard.egg...@gmail.com> Date: 2015-10-24T16:17:09Z reworked ComplexFutureAction and AsyncRDDActions.takeAsync to be non-blocking --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org