Abdeali Kothari created ARROW-4890:
--------------------------------------
Summary: Spark+Arrow Grouped pandas UDAF - read length must be
positive or -1
Key: ARROW-4890
URL: https://issues.apache.org/jira/browse/ARROW-4890
Project: Apache Arrow
Issue Type: Bug
Components: Python
Affects Versions: 0.8.0
Environment: Cloudera cdh5.13.3
Cloudera Spark 2.3.0.cloudera3
Reporter: Abdeali Kothari
Creating this in Arrow project as the traceback seems to suggest this is an
issue in Arrow.
Continuation from the conversation on the
[https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E|Email
list]
When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
{noformat}
File
"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
line 279, in load_stream
for batch in reader:
File "pyarrow/ipc.pxi", line 265, in __iter__
File "pyarrow/ipc.pxi", line 281, in
pyarrow.lib._RecordBatchReader.read_next_batch
File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1
{noformat}
as my dataset size starts increasing that I want to group on. Here is a
reproducible code snippet where I can reproduce this.
Note: My actual dataset is much larger and has many more unique IDs and is a
valid usecase where I cannot simplify this groupby in any way. I have stripped
out all the logic to make this example as simple as I could.
{code}
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd
spark = pyspark.sql.SparkSession.builder.getOrCreate()
pdf1 = pd.DataFrame(
[[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
)
df1 = spark.createDataFrame(pd.concat([pdf1 for i in
range(429)]).reset_index()).drop('index')
pdf2 = pd.DataFrame(
[[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno",
"abcdefghijklmno"]],
columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
)
df2 = spark.createDataFrame(pd.concat([pdf2 for i in
range(48993)]).reset_index()).drop('index')
df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
def myudf(df):
return df
df4 = df3
udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
df5 = df4.groupBy('df1_c1').apply(udf)
print('df5.count()', df5.count())
# df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
{code}
I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per
executor too.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)