Github user reggert commented on a diff in the pull request: https://github.com/apache/spark/pull/9264#discussion_r44371154 --- Diff: core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala --- @@ -197,4 +197,30 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim Await.result(f, Duration(20, "milliseconds")) } } + + test("SimpleFutureAction callback must not consume a thread while waiting") { + val executorInvoked = Promise[Unit] + val fakeExecutionContext = new ExecutionContext { + override def execute(runnable: Runnable): Unit = { + executorInvoked.success(()) + } + override def reportFailure(t: Throwable): Unit = ??? + } + val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).countAsync() + f.onComplete(_ => ())(fakeExecutionContext) + assert(!executorInvoked.isCompleted) --- End diff -- I tried two other approaches (one using semaphores, one using actors) to come up with a "non-flaky" solution, but neither worked because the tasks get serialized and deserialized, even when running in local mode. It seems that "Thread.sleep" is the only viable approach, as ugly as it is. The async action should just go away after the job completes. I'll add a comment.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org