Please send a PR. Thanks for looking at this.

On Thu, Nov 16, 2017 at 7:27 AM Andrew Andrade <and...@andrewandrade.ca>
wrote:

> Hello devs,
>
> I know a lot of great work has been done recently with pandas to spark
> dataframes and vice versa using Apache Arrow, but I faced a specific pain
> point on a low memory setup without Arrow.
>
> Specifically I was finding a driver OOM running a toPandas on a small
> dataset (<100 MB compressed).  There was discussion about toPandas being
> slow
> <http://apache-spark-developers-list.1001551.n3.nabble.com/toPandas-very-slow-td16794.html>
> in March 2016 due to a self.collect().  A solution was found to create Pandas
> DataFrames or Numpy Arrays using MapPartitions for each partition
> <https://gist.github.com/joshlk/871d58e01417478176e7>, but it was never
> implemented back into dataframe.py
>
> I understand that using Apache arrow will solve this, but in a setup
> without Arrow (like the one where I faced the painpoint), I investigated
> memory usage of toPanda and to_pandas (dataframe per partition) and played
> with the number of partitions.  The findings are here
> <https://gist.github.com/mrandrewandrade/7f5ff26c5275376d3cd5e427ca74d50f>
> .
>
> The summary of the findings are that on a 147MB dataset, toPandas memory
> usage was about 784MB while while doing it partition by partition (with 100
> partitions) had an overhead of 76.30 MM and took almost half of the time to
> run.  I realize that Arrow solves this but the modification is quite small
> and would greatly assist anyone who isn't able to use Arrow.
>
> Would a PR [1] from me to address this issue be welcome?
>
> Thanks,
>
> Andrew
>
> [1] From Josh's Gist
>
> def _map_to_pandas(rdds):
>     """ Needs to be here due to pickling issues """
>     return [pd.DataFrame(list(rdds))]
>
> def toPandas(df, n_partitions=None):
>     """
>     Returns the contents of `df` as a local `pandas.DataFrame` in a speedy
> fashion. The DataFrame is
>     repartitioned if `n_partitions` is passed.
>     :param df:              pyspark.sql.DataFrame
>     :param n_partitions:    int or None
>     :return:                pandas.DataFrame
>     """
>     if n_partitions is not None: df = df.repartition(n_partitions)
>     df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
>     df_pand = pd.concat(df_pand)
>     df_pand.columns = df.columns
>     return df_pand
>

Reply via email to