[ 
https://issues.apache.org/jira/browse/SPARK-41037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17647878#comment-17647878
 ] 

Hyukjin Kwon commented on SPARK-41037:
--------------------------------------

This seems to be fixed in https://issues.apache.org/jira/browse/ARROW-17832

> Fix pandas_udf when return type is array of MapType working properly.
> ---------------------------------------------------------------------
>
>                 Key: SPARK-41037
>                 URL: https://issues.apache.org/jira/browse/SPARK-41037
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.4.0
>            Reporter: Haejoon Lee
>            Priority: Major
>
> The current behavior of `pandas_udf` is not working properly for the case 
> below:
> {code:java}
> df = spark.createDataFrame([["Testing_123", 9.6],["123_Testing", 
> 10.4]]).toDF("Text", "Double")
> @pandas_udf(ArrayType(MapType(StringType(),StringType())))
> def test_udf(x : pd.Series) -> pd.Series:
>     return x.transform(lambda x: [{"string" : y} for y in x.split("_")])
> >>> df.withColumn("test_col", test_udf("Text")).show()
> 22/11/08 12:22:51 ERROR Executor: Exception in task 10.0 in stage 2.0 (TID 
> 15)1]
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "pyarrow/array.pxi", line 1037, in pyarrow.lib.Array.from_pandas
>   File "pyarrow/array.pxi", line 313, in pyarrow.lib.array
>   File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
> pyarrow.lib.ArrowTypeError: Could not convert {'string': '123'} with type 
> dict: was not a sequence or recognized null for conversion to list type    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) 
> ~[scala-library-2.12.17.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.17.jar:?]
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source) ~[?:?]
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.scheduler.Task.run(Task.scala:139) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:833) ~[?:?]
> 22/11/08 12:22:51 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "pyarrow/array.pxi", line 1037, in pyarrow.lib.Array.from_pandas
>   File "pyarrow/array.pxi", line 313, in pyarrow.lib.array
>   File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
> pyarrow.lib.ArrowTypeError: Could not convert {'string': 'Testing'} with type 
> dict: was not a sequence or recognized null for conversion to list type    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) 
> ~[scala-library-2.12.17.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.17.jar:?]
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source) ~[?:?]
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
>  ~[spark-sql_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.scheduler.Task.run(Task.scala:139) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>  ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) 
> ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:833) ~[?:?]
> 22/11/08 12:22:51 WARN TaskSetManager: Lost task 10.0 in stage 2.0 (TID 15) 
> (172.30.1.64 executor driver): org.apache.spark.api.python.PythonException: 
> Traceback (most recent call last):
>   File "pyarrow/array.pxi", line 1037, in pyarrow.lib.Array.from_pandas
>   File "pyarrow/array.pxi", line 313, in pyarrow.lib.array
>   File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
> pyarrow.lib.ArrowTypeError: Could not convert {'string': '123'} with type 
> dict: was not a sequence or recognized null for conversion to list type    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
>     at 
> org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
>     at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>     at org.apache.spark.scheduler.Task.run(Task.scala:139)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at java.base/java.lang.Thread.run(Thread.java:833)22/11/08 12:22:51 ERROR 
> TaskSetManager: Task 10 in stage 2.0 failed 1 times; aborting job
> 22/11/08 12:22:51 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 7) 
> (172.30.1.64 executor driver): org.apache.spark.api.python.PythonException: 
> Traceback (most recent call last):
>   File "pyarrow/array.pxi", line 1037, in pyarrow.lib.Array.from_pandas
>   File "pyarrow/array.pxi", line 313, in pyarrow.lib.array
>   File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
> pyarrow.lib.ArrowTypeError: Could not convert {'string': 'Testing'} with type 
> dict: was not a sequence or recognized null for conversion to list type    at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
>     at 
> org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
>     at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source)
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
>     at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>     at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>     at org.apache.spark.scheduler.Task.run(Task.scala:139)
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at java.base/java.lang.Thread.run(Thread.java:833)Traceback (most recent 
> call last):
>   File "<stdin>", line 1, in <module>
>   File 
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/dataframe.py", 
> line 849, in show
>     print(self._jdf.showString(n, 20, vertical))
>   File 
> "/Users/haejoon.lee/Desktop/git_store/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
>  line 1322, in __call__
>   File 
> "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/sql/utils.py", 
> line 205, in deco
>     raise converted from None
> pyspark.sql.utils.PythonException:
>   An exception was thrown from the Python worker. Please see the stack trace 
> below.
> Traceback (most recent call last):
>   File "pyarrow/array.pxi", line 1037, in pyarrow.lib.Array.from_pandas
>   File "pyarrow/array.pxi", line 313, in pyarrow.lib.array
>   File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
> pyarrow.lib.ArrowTypeError: Could not convert {'string': '123'} with type 
> dict: was not a sequence or recognized null for conversion to list type{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to