[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969714#comment-16969714 ]
Micah Kornfield commented on ARROW-4890: ---------------------------------------- Yes. I believe it is 2GB per shard currently. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > ----------------------------------------------------------------------------- > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python > Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 > Reporter: Abdeali Kothari > Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)