[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250749#comment-17250749
 ] 

Liya Fan commented on ARROW-4890:
---------------------------------

[~dishka_krauch] It is difficult to find the cause without providing more 
details about the problem.
The possible reasons that comes to my mind include:
1. If you are using Flight for IPC, buffers larger than 2GB are not supported, 
as restricted by GRPC.
2. Please make sure you are using UnsafeAllocationManager as the allocation 
manager (not NettyAllocationManager)

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -----------------------------------------------------------------------------
>
>                 Key: ARROW-4890
>                 URL: https://issues.apache.org/jira/browse/ARROW-4890
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 0.8.0
>         Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>            Reporter: Abdeali Kothari
>            Priority: Major
>         Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>       [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>       columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>       [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>       columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
>     return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to