Hello devs, I know a lot of great work has been done recently with pandas to spark dataframes and vice versa using Apache Arrow, but I faced a specific pain point on a low memory setup without Arrow.
Specifically I was finding a driver OOM running a toPandas on a small dataset (<100 MB compressed). There was discussion about toPandas being slow <http://apache-spark-developers-list.1001551.n3.nabble.com/toPandas-very-slow-td16794.html> in March 2016 due to a self.collect(). A solution was found to create Pandas DataFrames or Numpy Arrays using MapPartitions for each partition <https://gist.github.com/joshlk/871d58e01417478176e7>, but it was never implemented back into dataframe.py I understand that using Apache arrow will solve this, but in a setup without Arrow (like the one where I faced the painpoint), I investigated memory usage of toPanda and to_pandas (dataframe per partition) and played with the number of partitions. The findings are here <https://gist.github.com/mrandrewandrade/7f5ff26c5275376d3cd5e427ca74d50f>. The summary of the findings are that on a 147MB dataset, toPandas memory usage was about 784MB while while doing it partition by partition (with 100 partitions) had an overhead of 76.30 MM and took almost half of the time to run. I realize that Arrow solves this but the modification is quite small and would greatly assist anyone who isn't able to use Arrow. Would a PR [1] from me to address this issue be welcome? Thanks, Andrew [1] From Josh's Gist def _map_to_pandas(rdds): """ Needs to be here due to pickling issues """ return [pd.DataFrame(list(rdds))] def toPandas(df, n_partitions=None): """ Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is repartitioned if `n_partitions` is passed. :param df: pyspark.sql.DataFrame :param n_partitions: int or None :return: pandas.DataFrame """ if n_partitions is not None: df = df.repartition(n_partitions) df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() df_pand = pd.concat(df_pand) df_pand.columns = df.columns return df_pand