GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/15068

    [SPARK-17514] df.take(1) and df.limit(1).collect() should perform the same 
in Python

    ## What changes were proposed in this pull request?
    
    In PySpark, `df.take(1)` ends up running a single-stage job which computes 
only one partition of the DataFrame, while `df.limit(1).collect()` ends up 
computing all partitions and runs a two-stage job. This difference in 
performance is confusing.
    
    The reason why `limit(1).collect()` is so much slower is that `collect()` 
internally maps to `df.rdd.<some-pyspark-conversions>.toLocalIterator`, which 
causes Spark SQL to build a query where a global limit appears in the middle of 
the plan; this, in turn, ends up being executed inefficiently because limits in 
the middle of plans are now implemented by repartitioning to a single task 
rather than by running a `take()` job on the driver (this was done in #7334, a 
patch which was a prerequisite to allowing partition-local limits to be pushed 
beneath unions, etc.).
    
    In order to fix this performance problem I think that we should generalize 
the fix from SPARK-10731 / #8876 so that `DataFrame.collect()` also delegates 
to the Scala implementation and shares the same performance properties. This 
patch modifies `DataFrame.collect()` to first collect all results to the driver 
and then pass them to Python, allowing this query to be planned using Spark's 
`CollectLimit` optimizations.
    
    ## How was this patch tested?
    
    Added a regression test in `sql/tests.py` which asserts that the expected 
number of jobs, stages, and tasks are run for both queries.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark pyspark-collect-limit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15068.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15068
    
----
commit 44984a7e37e2d8635e8c461031a182fd7a8d41ce
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-09-13T00:36:56Z

    Add regression test.

commit 6fa9d9278de365e0cbb6ccc779d887bfe0db46f6
Author: Josh Rosen <joshro...@databricks.com>
Date:   2016-09-12T23:08:06Z

    Implement PySpark take as limit + collect.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to