[
https://issues.apache.org/jira/browse/ARROW-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Daniel Fithian updated ARROW-2590:
----------------------------------
Description:
I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon EMR.
When I try to run any aggregation, I get the following Python stack trace:
{{18/05/16 14:08:56 ERROR Utils: Aborting task}}
{{org.apache.spark.api.python.PythonException: Traceback (most recent call
last):}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
line 229, in m}}
{{ain}}
\{{ process()}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
line 224, in p}}
{{rocess}}
\{{ serializer.dump_stream(func(split_index, iterator), outfile)}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 261,}}
\{{ in dump_stream}}
\{{ batch = _create_batch(series, self._timezone)}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 239,}}
\{{ in _create_batch}}
{{ arrs = [create_array(s, t) for s, t in series]}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 239,}}
\{{ in <listcomp>}}
{{ arrs = [create_array(s, t) for s, t in series]}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 237, in create_array}}
\{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}
\{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}
\{{ File "array.pxi", line 177, in pyarrow.lib.array}}
\{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}
\{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}
{{pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode
bytes in position 0-3: code point not in range(0x110000)}}
To be clear, this happens when I run any aggregation, including the identity
aggregation (return the Pandas DataFrame that was passed in). I do not get this
error when I return an empty DataFrame, so it seems to be a symptom of the
serialization of the Pandas DataFrame back to Spark.
I have observed this behavior with the following versions:
* Spark 2.3.0
* PyArrow 0.9.0 (also 0.8.0)
* Pandas 0.22.0 (also 0.22.1)
* Numpy 1.14.1
Here is some sample code:
@func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)
def aggregation(df):
return df
{{df.groupBy('a').apply(aggregation) # get error}}
was:
I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon EMR.
When I try to run any aggregation, I get the following Python stack trace:
{{18/05/16 14:08:56 ERROR Utils: Aborting task}}
{{org.apache.spark.api.python.PythonException: Traceback (most recent call
last):}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
line 229, in m}}
{{ain}}
\{{ process()}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
line 224, in p}}
{{rocess}}
\{{ serializer.dump_stream(func(split_index, iterator), outfile)}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 261,}}
\{{ in dump_stream}}
\{{ batch = _create_batch(series, self._timezone)}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 239,}}
\{{ in _create_batch}}
{{ arrs = [create_array(s, t) for s, t in series]}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 239,}}
\{{ in <listcomp>}}
{{ arrs = [create_array(s, t) for s, t in series]}}
\{{ File
"/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
line 237, in create_array}}
\{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}
\{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}
\{{ File "array.pxi", line 177, in pyarrow.lib.array}}
\{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}
\{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}
{{pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode
bytes in position 0-3: code point not in range(0x110000)}}
To be clear, this happens when I run any aggregation, including the identity
aggregation (return the Pandas DataFrame that was passed in). I do not get this
error when I return an empty DataFrame, so it seems to be a symptom of the
serialization of the Pandas DataFrame back to Spark.
I have observed this behavior with the following versions:
* Spark 2.3.0
* PyArrow 0.9.0 (also 0.8.0)
* Pandas 0.22.0 (also 0.22.1)
* Numpy 1.14.1
Here is some sample code:
> Pyspark python_udf serialization error on grouped map (Amazon EMR)
> ------------------------------------------------------------------
>
> Key: ARROW-2590
> URL: https://issues.apache.org/jira/browse/ARROW-2590
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Affects Versions: 0.9.0
> Environment: Amazon EMR 5.13
> Spark 2.3.0
> PyArrow 0.9.0 (and 0.8.0)
> Pandas 0.22.0 (and 0.21.1)
> Numpy 1.14.1
> Reporter: Daniel Fithian
> Priority: Critical
>
> I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon
> EMR. When I try to run any aggregation, I get the following Python stack
> trace:
> {{18/05/16 14:08:56 ERROR Utils: Aborting task}}
> {{org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):}}
> \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
> line 229, in m}}
> {{ain}}
> \{{ process()}}
> \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py",
> line 224, in p}}
> {{rocess}}
> \{{ serializer.dump_stream(func(split_index, iterator), outfile)}}
> \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 261,}}
> \{{ in dump_stream}}
> \{{ batch = _create_batch(series, self._timezone)}}
> \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 239,}}
> \{{ in _create_batch}}
> {{ arrs = [create_array(s, t) for s, t in series]}}
> \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 239,}}
> \{{ in <listcomp>}}
> {{ arrs = [create_array(s, t) for s, t in series]}}
> \{{ File
> "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py",
> line 237, in create_array}}
> \{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}
> \{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}
> \{{ File "array.pxi", line 177, in pyarrow.lib.array}}
> \{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}
> \{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}
> {{pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode
> bytes in position 0-3: code point not in range(0x110000)}}
> To be clear, this happens when I run any aggregation, including the identity
> aggregation (return the Pandas DataFrame that was passed in). I do not get
> this error when I return an empty DataFrame, so it seems to be a symptom of
> the serialization of the Pandas DataFrame back to Spark.
> I have observed this behavior with the following versions:
> * Spark 2.3.0
> * PyArrow 0.9.0 (also 0.8.0)
> * Pandas 0.22.0 (also 0.22.1)
> * Numpy 1.14.1
> Here is some sample code:
> @func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)
> def aggregation(df):
> return df
>
> {{df.groupBy('a').apply(aggregation) # get error}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)