GitHub user JoshRosen opened a pull request: https://github.com/apache/spark/pull/14854
[SPARK-17283][WIP][Core] Cancel job in RDD.take() as soon as enough output is received ## What changes were proposed in this pull request? This patch introduces a new internal implementation of `RDD.take()` which halts the Spark job as soon as enough rows have been received. This can offer substantial performance improvements for interactive queries. For example, consider ``` sc.parallelize(1 to 100000, 100000).filter(_ > 10000).take(1) ``` In the current `take()` implementation, this will end up computing almost every partition of the RDD even though only the first 10% of partitions must be computed to return the desired output. With this patch's changes, Spark will cancel the running job as soon as enough output has been received, greatly improving responsiveness for limit queries with highly-selective filters. Although the changes here are implemented in Spark core, they also benefit Spark SQL; the following query also observes similar speedups: ``` sc.parallelize(1 to 100000, 100000).toDF.filter("value > 10000").take(1) ``` **Caveat:** the performance optimizations implemented here do not apply to `AsyncRDDActions.takeAsync()`. The `takeAsync()` codepath is fairly complicated and, to the best of my knowledge, is not very widely used, so I'd like to postpone any changes to it. This patch is marked as WIP pending tests and a few design details that I'd like to discuss (will comment inline). ### Implementation details **Task result buffering:** this new implementation of `take()` returns exactly the same answers as the old implementation. The old implementation first collected the per-partition results from all partitions and then processed them in ascending order of partition id. In order to match this behavior, the new implementation carefully buffers task outputs in order to process them in order. The code is extensively commented to explain how its bookkeeping logic allows this to be done efficiently. **Scheduler changes:** the existing `FutureAction.cancel()` method causes the Spark scheduler to mark the task as failed, which doesn't make sense in this context. To address this, this patch modifies the DAGScheduler to allow jobs to be cancelled while still marking them as successful. This is a slightly tricky change that will require discussion and more tests (to be added here). ### Feature flags I have made this new implementation the default but have preserved the old `take()` implementations behind feature-flags. In Spark Core, the old `take()` implementation can be restored by setting `spark.driver.useOldTakeImplementation=true` in SparkConf. In Spark SQL, the old implementation can be used by setting `spark.sql.onlineTake.enabled=false` in SQLConf. ## How was this patch tested? TODO; tests pending. I plan to add the following tests: - Ensure that the driver thread (the thread which calls `take()`) is interrupted / unblocked in case the user cancels the active `take()` job via the web UI or via a job group. - Ensure that the console progress bar disappears after the job halts early. - Add separate unit tests for the new "cancel without failing job" scheduler functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JoshRosen/spark take-early-stopping Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14854 ---- commit 4c625788c3bd7c7c649937f95887653cd2cfc93f Author: Josh Rosen <joshro...@databricks.com> Date: 2016-08-27T21:30:08Z Initial commit of optimized take(). commit 0cd97eebeb0c6cb89fae850ef7e5299b1ddab480 Author: Josh Rosen <joshro...@databricks.com> Date: 2016-08-27T22:25:17Z Fix negated feature flag. commit ef5e9f73ea9cbe8aa71e781fc1cca0cb22957bc0 Author: Josh Rosen <joshro...@databricks.com> Date: 2016-08-28T18:08:49Z Add scheduler path for canceling job without failing it. commit 75640e2a08c41f571bacdd5fb7ef75e67eb62226 Author: Josh Rosen <joshro...@databricks.com> Date: 2016-08-28T18:44:48Z Comment update. commit 0751d776c19711e5c576ada8b46bfd79a5f50348 Author: Josh Rosen <joshro...@databricks.com> Date: 2016-08-28T19:16:16Z Preserve partition-local limits in RDD take(). ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org