[ 
https://issues.apache.org/jira/browse/ARROW-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16823289#comment-16823289
 ] 

Bryan Cutler edited comment on ARROW-2590 at 4/22/19 5:55 PM:
--------------------------------------------------------------

I wasn't able to reproduce the exact error from above, when I mix up a long 
column with string data I get:
{noformat}
pyarrow.lib.ArrowNotImplementedError: No cast implemented from string to 
int64{noformat}
I am using Linux Ubuntu 18.04 and tried Spark 2.3 and master, pyarrow 0.8.0,  
0.10.0, 0.12.1.

If this is a matter of mixing up columns, there was a JIRA SPARK-24324 that 
changed column assignment to use name by default, fixed in 2.4.0. You can work 
around that by specifying column labels explicitly when creating the DataFrame 
to match the expected schema, for example:
{code:java}
@pandas_udf("z string, a long", PandasUDFType.GROUPED_MAP)
def foo(_):
    return pd.DataFrame({'a': [1], 'z': ['hi']}, columns=['z', 'a']){code}
I'll close this for now, but feel free to reopen if this doesn't solve your 
issue.


was (Author: bryanc):
I wasn't able to reproduce the exact error from above, when I mix up a long 
column with string data I get:
{noformat}
pyarrow.lib.ArrowNotImplementedError: No cast implemented from string to 
int64{noformat}
I am using Linux Ubuntu 18.04 and tried Spark 2.3 and master, pyarrow 0.8.0 and 
0.10.0.

If this is a matter of mixing up columns, there was a JIRA SPARK-24324 that 
changed column assignment to use name by default, fixed in 2.4.0. You can work 
around that by specifying column labels explicitly when creating the DataFrame 
to match the expected schema, for example:
{code:java}
@pandas_udf("z string, a long", PandasUDFType.GROUPED_MAP)
def foo(_):
    return pd.DataFrame({'a': [1], 'z': ['hi']}, columns=['z', 'a']){code}
I'll close this for now, but feel free to reopen if this doesn't solve your 
issue.

> [Python] Pyspark python_udf serialization error on grouped map (Amazon EMR)
> ---------------------------------------------------------------------------
>
>                 Key: ARROW-2590
>                 URL: https://issues.apache.org/jira/browse/ARROW-2590
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 0.9.0
>         Environment: Amazon EMR 5.13
> Spark 2.3.0
> PyArrow 0.9.0 (and 0.8.0)
> Pandas 0.22.0 (and 0.21.1)
> Numpy 1.14.1
>            Reporter: Daniel Fithian
>            Priority: Critical
>              Labels: spark
>             Fix For: 0.14.0
>
>
> I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon 
> EMR. When I try to run any aggregation, I get the following Python stack 
> trace:
> {quote}{{18/05/16 14:08:56 ERROR Utils: Aborting task}}
> {{ org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
>  line 229, in m}}}}
> {{ ain}}
> {{ \{{ process()}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
>  line 224, in p}}}}
> {{ rocess}}
> {{ \{{ serializer.dump_stream(func(split_index, iterator), outfile)}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 261,}}}}
> {{ \{{ in dump_stream}}}}
> {{ \{{ batch = _create_batch(series, self._timezone)}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 239,}}}}
> {{ \{{ in _create_batch}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 239,}}}}
> {{ \{{ in <listcomp>}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
>  line 237, in create_array}}}}
> {{ \{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}}}
> {{ \{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}}}
> {{ \{{ File "array.pxi", line 177, in pyarrow.lib.array}}}}
> {{ \{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}}}
> {{ \{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}}}
> {{ pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode 
> bytes in position 0-3: code point not in range(0x110000)}}{quote}
> To be clear, this happens when I run any aggregation, including the identity 
> aggregation (return the Pandas DataFrame that was passed in). I do not get 
> this error when I return an empty DataFrame, so it seems to be a symptom of 
> the serialization of the Pandas DataFrame back to Spark.
> I have observed this behavior with the following versions:
>  * Spark 2.3.0
>  * PyArrow 0.9.0 (also 0.8.0)
>  * Pandas 0.22.0 (also 0.22.1)
>  * Numpy 1.14.1
> Here is some sample code:
> {quote}{{@func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)}}{quote}
> {quote}{{def aggregation(df):}}{quote}
> {quote}{{    return df}}{quote}
> {quote}{{df.groupBy('a').apply(aggregation) # get error}}{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to