GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/14854

    [SPARK-17283][WIP][Core] Cancel job in RDD.take() as soon as enough output 
is received

    ## What changes were proposed in this pull request?
    
    This patch introduces a new internal implementation of `RDD.take()` which 
halts the Spark job as soon as enough rows have been received. This can offer 
substantial performance improvements for interactive queries. For example, 
consider
    
    ```
    sc.parallelize(1 to 100000, 100000).filter(_ > 10000).take(1)
    ```
    
    In the current `take()` implementation, this will end up computing almost 
every partition of the RDD even though only the first 10% of partitions must be 
computed to return the desired output. With this patch's changes, Spark will 
cancel the running job as soon as enough output has been received, greatly 
improving responsiveness for limit queries with highly-selective filters.
    
    Although the changes here are implemented in Spark core, they also benefit 
Spark SQL; the following query also observes similar speedups:
    
    ```
    sc.parallelize(1 to 100000, 100000).toDF.filter("value > 10000").take(1)
    ```
    
    **Caveat:** the performance optimizations implemented here do not apply to 
`AsyncRDDActions.takeAsync()`. The `takeAsync()` codepath is fairly complicated 
and, to the best of my knowledge, is not very widely used, so I'd like to 
postpone any changes to it.
    
    This patch is marked as WIP pending tests and a few design details that I'd 
like to discuss (will comment inline).
    
    ### Implementation details
    
    **Task result buffering:** this new implementation of `take()` returns 
exactly the same answers as the old implementation. The old implementation 
first collected the per-partition results from all partitions and then 
processed them in ascending order of partition id. In order to match this 
behavior, the new implementation carefully buffers task outputs in order to 
process them in order. The code is extensively commented to explain how its 
bookkeeping logic allows this to be done efficiently.
    
    **Scheduler changes:** the existing `FutureAction.cancel()` method causes 
the Spark scheduler to mark the task as failed, which doesn't make sense in 
this context. To address this, this patch modifies the DAGScheduler to allow 
jobs to be cancelled while still marking them as successful. This is a slightly 
tricky change that will require discussion and more tests (to be added here).
    
    ### Feature flags
    
    I have made this new implementation the default but have preserved the old 
`take()` implementations behind feature-flags.
    
    In Spark Core, the old `take()` implementation can be restored by setting 
`spark.driver.useOldTakeImplementation=true` in SparkConf.
    
    In Spark SQL, the old implementation can be used by setting 
`spark.sql.onlineTake.enabled=false` in SQLConf.
    
    ## How was this patch tested?
    
    TODO; tests pending. I plan to add the following tests:
    
    - Ensure that the driver thread (the thread which calls `take()`) is 
interrupted / unblocked in case the user cancels the active `take()` job via 
the web UI or via a job group.
    - Ensure that the console progress bar disappears after the job halts early.
    - Add separate unit tests for the new "cancel without failing job" 
scheduler functionality.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark take-early-stopping

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14854
    
----
commit 4c625788c3bd7c7c649937f95887653cd2cfc93f
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-08-27T21:30:08Z

    Initial commit of optimized take().

commit 0cd97eebeb0c6cb89fae850ef7e5299b1ddab480
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-08-27T22:25:17Z

    Fix negated feature flag.

commit ef5e9f73ea9cbe8aa71e781fc1cca0cb22957bc0
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-08-28T18:08:49Z

    Add scheduler path for canceling job without failing it.

commit 75640e2a08c41f571bacdd5fb7ef75e67eb62226
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-08-28T18:44:48Z

    Comment update.

commit 0751d776c19711e5c576ada8b46bfd79a5f50348
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-08-28T19:16:16Z

    Preserve partition-local limits in RDD take().

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to