This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f57f7a7 [SPARK-31441] Support duplicated column names for toPandas with arrow execution f57f7a7 is described below commit f57f7a766d30854aaba72068dcfbee7acfa50cc5 Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Tue Apr 14 14:08:56 2020 +0900 [SPARK-31441] Support duplicated column names for toPandas with arrow execution ### What changes were proposed in this pull request? This PR is adding support duplicated column names for `toPandas` with Arrow execution. ### Why are the changes needed? When we execute `toPandas()` with Arrow execution, it fails if the column names have duplicates. ```py >>> spark.sql("select 1 v, 1 v").toPandas() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/path/to/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 2132, in toPandas pdf = table.to_pandas() File "pyarrow/array.pxi", line 441, in pyarrow.lib._PandasConvertible.to_pandas File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 653, in table_to_blockmanager columns = _deserialize_column_index(table, all_columns, column_indexes) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 704, in _deserialize_column_index columns = _flatten_single_level_multiindex(columns) File "/path/to/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 937, in _flatten_single_level_multiindex raise ValueError('Found non-unique column index') ValueError: Found non-unique column index ``` ### Does this PR introduce any user-facing change? Yes, previously we will face an error above, but after this PR, we will see the result: ```py >>> spark.sql("select 1 v, 1 v").toPandas() v v 0 1 1 ``` ### How was this patch tested? Added and modified related tests. Closes #28210 from ueshin/issues/SPARK-31441/to_pandas. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 87be3641eb3517862dd5073903d5b37275852066) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/pandas/conversion.py | 6 +++++- python/pyspark/sql/tests/test_dataframe.py | 27 +++++++++++++++++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 47cf8bb..251625a 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -103,13 +103,17 @@ class PandasConversionMixin(object): try: from pyspark.sql.pandas.types import _check_series_localize_timestamps import pyarrow - batches = self._collect_as_arrow() + # Rename columns to avoid duplicated column names. + tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))] + batches = self.toDF(*tmp_column_names)._collect_as_arrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type # values, but we should use datetime.date to match the behavior with when # Arrow optimization is disabled. pdf = table.to_pandas(date_as_object=True) + # Rename back to the original column names. + pdf.columns = self.columns for field in self.schema: if isinstance(field.dataType, TimestampType): pdf[field.name] = \ diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index d9dcbc0..738c984 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -530,6 +530,19 @@ class DataFrameTests(ReusedSQLTestCase): self.assertEquals(types[5], 'datetime64[ns]') @unittest.skipIf(not have_pandas, pandas_requirement_message) + def test_to_pandas_with_duplicated_column_names(self): + import numpy as np + + sql = "select 1 v, 1 v" + for arrowEnabled in [False, True]: + with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) + + @unittest.skipIf(not have_pandas, pandas_requirement_message) def test_to_pandas_on_cross_join(self): import numpy as np @@ -540,12 +553,14 @@ class DataFrameTests(ReusedSQLTestCase): select explode(sequence(1, 3)) v ) t2 """ - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = self.spark.sql(sql) - pdf = df.toPandas() - types = pdf.dtypes - self.assertEquals(types.iloc[0], np.int32) - self.assertEquals(types.iloc[1], np.int32) + for arrowEnabled in [False, True]: + with self.sql_conf({"spark.sql.crossJoin.enabled": True, + "spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) @unittest.skipIf(have_pandas, "Required Pandas was found.") def test_to_pandas_required_pandas_not_found(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org