[ https://issues.apache.org/jira/browse/ARROW-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16823143#comment-16823143 ]
Wes McKinney commented on ARROW-2590: ------------------------------------- Thanks, hopefully [~bryanc] or someone more intimately familiar with the details can reproduce the issue > [Python] Pyspark python_udf serialization error on grouped map (Amazon EMR) > --------------------------------------------------------------------------- > > Key: ARROW-2590 > URL: https://issues.apache.org/jira/browse/ARROW-2590 > Project: Apache Arrow > Issue Type: Bug > Components: Python > Affects Versions: 0.9.0 > Environment: Amazon EMR 5.13 > Spark 2.3.0 > PyArrow 0.9.0 (and 0.8.0) > Pandas 0.22.0 (and 0.21.1) > Numpy 1.14.1 > Reporter: Daniel Fithian > Priority: Critical > Labels: spark > Fix For: 0.14.0 > > > I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon > EMR. When I try to run any aggregation, I get the following Python stack > trace: > {quote}{{18/05/16 14:08:56 ERROR Utils: Aborting task}} > {{ org.apache.spark.api.python.PythonException: Traceback (most recent call > last):}} > {{ \{{ File > "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py", > line 229, in m}}}} > {{ ain}} > {{ \{{ process()}}}} > {{ \{{ File > "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py", > line 224, in p}}}} > {{ rocess}} > {{ \{{ serializer.dump_stream(func(split_index, iterator), outfile)}}}} > {{ \{{ File > "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", > line 261,}}}} > {{ \{{ in dump_stream}}}} > {{ \{{ batch = _create_batch(series, self._timezone)}}}} > {{ \{{ File > "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", > line 239,}}}} > {{ \{{ in _create_batch}}}} > {{ {{ arrs = [create_array(s, t) for s, t in series]}}}} > {{ \{{ File > "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", > line 239,}}}} > {{ \{{ in <listcomp>}}}} > {{ {{ arrs = [create_array(s, t) for s, t in series]}}}} > {{ \{{ File > "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", > line 237, in create_array}}}} > {{ \{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}}} > {{ \{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}}} > {{ \{{ File "array.pxi", line 177, in pyarrow.lib.array}}}} > {{ \{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}}} > {{ \{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}}} > {{ pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode > bytes in position 0-3: code point not in range(0x110000)}}{quote} > To be clear, this happens when I run any aggregation, including the identity > aggregation (return the Pandas DataFrame that was passed in). I do not get > this error when I return an empty DataFrame, so it seems to be a symptom of > the serialization of the Pandas DataFrame back to Spark. > I have observed this behavior with the following versions: > * Spark 2.3.0 > * PyArrow 0.9.0 (also 0.8.0) > * Pandas 0.22.0 (also 0.22.1) > * Numpy 1.14.1 > Here is some sample code: > {quote}{{@func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)}}{quote} > {quote}{{def aggregation(df):}}{quote} > {quote}{{ return df}}{quote} > {quote}{{df.groupBy('a').apply(aggregation) # get error}}{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)