Ruifeng Zheng created SPARK-41971:
-------------------------------------

             Summary: `toPandas` should support duplicate filed names when 
arrow-optimization is on
                 Key: SPARK-41971
                 URL: https://issues.apache.org/jira/browse/SPARK-41971
             Project: Spark
          Issue Type: Improvement
          Components: PySpark
    Affects Versions: 3.4.0
            Reporter: Ruifeng Zheng


toPandas support duplicate columns name, but for a struct column, it doesnot 
support duplicate field names.

{code:java}
In [27]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)

In [28]: spark.sql("select 1 v, 1 v").toPandas()
Out[28]: 
   v  v
0  1  1

In [29]: spark.sql("select struct(1 v, 1 v)").toPandas()
Out[29]: 
  struct(1 AS v, 1 AS v)
0                 (1, 1)

In [30]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

In [31]: spark.sql("select 1 v, 1 v").toPandas()
Out[31]: 
   v  v
0  1  1

In [32]: spark.sql("select struct(1 v, 1 v)").toPandas()
/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/pandas/conversion.py:204: 
UserWarning: toPandas attempted Arrow optimization because 
'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the 
error below and can not continue. Note that 
'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on 
failures in the middle of computation.
  Ran out of field metadata, likely malformed
  warn(msg)
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
Cell In[32], line 1
----> 1 spark.sql("select struct(1 v, 1 v)").toPandas()

File ~/Dev/spark/python/pyspark/sql/pandas/conversion.py:143, in 
PandasConversionMixin.toPandas(self)
    141 tmp_column_names = ["col_{}".format(i) for i in 
range(len(self.columns))]
    142 self_destruct = jconf.arrowPySparkSelfDestructEnabled()
--> 143 batches = self.toDF(*tmp_column_names)._collect_as_arrow(
    144     split_batches=self_destruct
    145 )
    146 if len(batches) > 0:
    147     table = pyarrow.Table.from_batches(batches)

File ~/Dev/spark/python/pyspark/sql/pandas/conversion.py:358, in 
PandasConversionMixin._collect_as_arrow(self, split_batches)
    356             results.append(batch_or_indices)
    357     else:
--> 358         results = list(batch_stream)
    359 finally:
    360     # Join serving thread and raise any exceptions from 
collectAsArrowToPython
    361     jsocket_auth_server.getResult()

File ~/Dev/spark/python/pyspark/sql/pandas/serializers.py:55, in 
ArrowCollectSerializer.load_stream(self, stream)
     50 """
     51 Load a stream of un-ordered Arrow RecordBatches, where the last 
iteration yields
     52 a list of indices that can be used to put the RecordBatches in the 
correct order.
     53 """
     54 # load the batches
---> 55 for batch in self.serializer.load_stream(stream):
     56     yield batch
     58 # load the batch order indices or propagate any error that occurred in 
the JVM

File ~/Dev/spark/python/pyspark/sql/pandas/serializers.py:98, in 
ArrowStreamSerializer.load_stream(self, stream)
     95 import pyarrow as pa
     97 reader = pa.ipc.open_stream(stream)
---> 98 for batch in reader:
     99     yield batch

File 
~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/ipc.pxi:638,
 in __iter__()

File 
~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/ipc.pxi:674,
 in pyarrow.lib.RecordBatchReader.read_next_batch()

File 
~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/error.pxi:100,
 in pyarrow.lib.check_status()

ArrowInvalid: Ran out of field metadata, likely malformed

{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to