Hi, >From the error it looks like this might potentially be some sort of integer overflow, but it is hard to say. Could you try to get a minimal reproduction of the error [1] , and open a JIRA Issue [2] with it?
Thanks, Micah [1] https://stackoverflow.com/help/mcve [2] https://issues.apache.org On Sunday, March 10, 2019, Abdeali Kothari <abdealikoth...@gmail.com> wrote: > Hi, any help on this would be much appreciated. > I've not been able to figure out any reason for this to happen yet > > On Sat, Mar 2, 2019, 11:50 Abdeali Kothari <abdealikoth...@gmail.com> > wrote: > > > Hi Li Jin, thanks for the note. > > > > I get this error only for larger data - when I reduce the number of > > records or the number or columns in my data it all works fine - so if it > is > > binary incompatibility it should be something related to large data. > > I am using Spark 2.3.1 on Amazon EMR for this testing. > > https://github.com/apache/spark/blob/v2.3.1/pom.xml#L192 seems to > > indicate arrow version is 0.8 for this. > > > > I installed pyarrow-0.8.0 in the python environment on my cluster with > pip > > and I am still getting this error. > > The stacktrace is very similar, just some lines moved in the pxi files: > > > > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > > recent call last): > > File > > > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/worker.py", > > line 230, in main > > process() > > File > > > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/worker.py", > > line 225, in process > > serializer.dump_stream(func(split_index, iterator), outfile) > > File > > > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/serializers.py", > > line 260, in dump_stream > > for series in iterator: > > File > > > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/serializers.py", > > line 279, in load_stream > > for batch in reader: > > File "pyarrow/ipc.pxi", line 268, in __iter__ > > (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:70278) > > File "pyarrow/ipc.pxi", line 284, in > > pyarrow.lib._RecordBatchReader.read_next_batch > > (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:70534) > > File "pyarrow/error.pxi", line 79, in pyarrow.lib.check_status > > (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:8345) > > pyarrow.lib.ArrowIOError: read length must be positive or -1 > > > > Other notes: > > - My data is just integers, strings, and doubles. No complex types like > > arrays/maps/etc. > > - I don't have any NULL/None values in my data > > - Increasing executor-memory for spark does not seem to help here > > > > As always: Any thoughts or notes would be great so I can get some > pointers > > in which direction to debug > > > > > > > > On Sat, Mar 2, 2019 at 2:24 AM Li Jin <ice.xell...@gmail.com> wrote: > > > >> The 2G limit that Uwe mentioned definitely exists, Spark serialize each > >> group as a single RecordBatch currently. > >> > >> The "pyarrow.lib.ArrowIOError: read length must be positive or -1" is > >> strange, I think Spark is on an older version of the Java side (0.10 for > >> Spark 2.4 and 0.8 for Spark 2.3). I forgot whether there is binary > >> incompatibility between these versions and pyarrow 0.12. > >> > >> On Fri, Mar 1, 2019 at 3:32 PM Abdeali Kothari < > abdealikoth...@gmail.com> > >> wrote: > >> > >> > Forgot to mention: The above testing is with 0.11.1 > >> > I tried 0.12.1 as you suggested - and am getting the > >> > OversizedAllocationException with the 80char column. And getting read > >> > length must be positive or -1 without that. So, both the issues are > >> > reproducible with pyarrow 0.12.1 > >> > > >> > On Sat, Mar 2, 2019 at 1:57 AM Abdeali Kothari < > >> abdealikoth...@gmail.com> > >> > wrote: > >> > > >> > > That was spot on! > >> > > I had 3 columns with 80characters => 80*21*10^6 = 1.56 bytes > >> > > I removed these columns and replaced each with 10 doubleType columns > >> (so > >> > > it would still be 80 bytes of data) - and this error didn't come up > >> > anymore. > >> > > I also removed all the other columns and just kept 1 column with > >> > > 80characters - I got the error again. > >> > > > >> > > I'll make a simpler example and report it to spark - as I guess > these > >> > > columns would need some special handling. > >> > > > >> > > Now, when I run - I get a different error: > >> > > 19/03/01 20:16:49 WARN TaskSetManager: Lost task 108.0 in stage 8.0 > >> (TID > >> > > 12, ip-172-31-10-249.us-west-2.compute.internal, executor 1): > >> > > org.apache.spark.api.python.PythonException: Traceback (most recent > >> call > >> > > last): > >> > > File > >> > > > >> > > >> > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py", > >> > > line 230, in main > >> > > process() > >> > > File > >> > > > >> > > >> > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py", > >> > > line 225, in process > >> > > serializer.dump_stream(func(split_index, iterator), outfile) > >> > > File > >> > > > >> > > >> > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py", > >> > > line 260, in dump_stream > >> > > for series in iterator: > >> > > File > >> > > > >> > > >> > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py", > >> > > line 279, in load_stream > >> > > for batch in reader: > >> > > File "pyarrow/ipc.pxi", line 265, in __iter__ > >> > > File "pyarrow/ipc.pxi", line 281, in > >> > > pyarrow.lib._RecordBatchReader.read_next_batch > >> > > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > >> > > pyarrow.lib.ArrowIOError: read length must be positive or -1 > >> > > > >> > > Again, any pointers on what this means and what it indicates would > be > >> > > really useful for me. > >> > > > >> > > Thanks for the replies! > >> > > > >> > > > >> > > On Fri, Mar 1, 2019 at 11:26 PM Uwe L. Korn <uw...@xhochy.com> > wrote: > >> > > > >> > >> There is currently the limitation that a column in a single > >> RecordBatch > >> > >> can only hold 2G on the Java side. We work around this by splitting > >> the > >> > >> DataFrame under the hood into multiple RecordBatches. I'm not > >> familiar > >> > with > >> > >> the Spark<->Arrow code but I guess that in this case, the Spark > code > >> can > >> > >> only handle a single RecordBatch. > >> > >> > >> > >> Probably it is best to construct a > >> https://stackoverflow.com/help/mcve > >> > >> and create an issue with the Spark project. Most likely this is > not a > >> > bug > >> > >> in Arrow but just requires a bit more complicated implementation > >> around > >> > the > >> > >> Arrow libs. > >> > >> > >> > >> Still, please have a look at the exact size of your columns. We > >> support > >> > >> 2G per column, if it is only 1.5G, then there is probably a > rounding > >> > error > >> > >> in the Arrow. Alternatively, you might also be in luck that the > >> > following > >> > >> patch > >> > >> > >> > > >> > https://github.com/apache/arrow/commit/bfe6865ba8087a46bd7665679e48af3a77987cef > >> > >> which is part of Apache Arrow 0.12 already fixes your problem. > >> > >> > >> > >> Uwe > >> > >> > >> > >> On Fri, Mar 1, 2019, at 6:48 PM, Abdeali Kothari wrote: > >> > >> > Is there a limitation that a single column cannot be more than > >> 1-2G ? > >> > >> > One of my columns definitely would be around 1.5GB of memory. > >> > >> > > >> > >> > I cannot split my DF into more partitions as I have only 1 ID and > >> I'm > >> > >> > grouping by that ID. > >> > >> > So, the UDAF would only run on a single pandasDF > >> > >> > I do have a requirement to make a very large DF for this UDAF > (8GB > >> as > >> > i > >> > >> > mentioned above) - trying to figure out what I need to do here to > >> make > >> > >> this > >> > >> > work. > >> > >> > Increasing RAM, etc. is no issue (i understand I'd need huge > >> executors > >> > >> as I > >> > >> > have a huge data requirement). But trying to figure out how much > to > >> > >> > actually get - cause 20GB of RAM for the executor is also > erroring > >> out > >> > >> > where I thought ~10GB would have been enough > >> > >> > > >> > >> > > >> > >> > > >> > >> > On Fri, Mar 1, 2019 at 10:25 PM Uwe L. Korn <uw...@xhochy.com> > >> wrote: > >> > >> > > >> > >> > > Hello Abdeali, > >> > >> > > > >> > >> > > a problem could here be that a single column of your dataframe > is > >> > >> using > >> > >> > > more than 2GB of RAM (possibly also just 1G). Try splitting > your > >> > >> DataFrame > >> > >> > > into more partitions before applying the UDAF. > >> > >> > > > >> > >> > > Cheers > >> > >> > > Uwe > >> > >> > > > >> > >> > > On Fri, Mar 1, 2019, at 9:09 AM, Abdeali Kothari wrote: > >> > >> > > > I was using arrow with spark+python and when I'm trying some > >> > >> pandas-UDAF > >> > >> > > > functions I am getting this error: > >> > >> > > > > >> > >> > > > org.apache.arrow.vector.util.OversizedAllocationException: > >> Unable > >> > to > >> > >> > > > expand > >> > >> > > > the buffer > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:457) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1188) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1026) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:256) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) > >> > >> > > > at > >> > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) > >> > >> > > > at > >> > >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) > >> > >> > > > at > >> > >> > > > > >> > >> > > > >> > >> > >> > > >> > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) > >> > >> > > > > >> > >> > > > I was initially getting a RAM is insufficient error - and > >> > >> theoretically > >> > >> > > > (with no compression) realized that the pandas DataFrame it > >> would > >> > >> try to > >> > >> > > > create would be ~8GB (21million records with each record > having > >> > ~400 > >> > >> > > > bytes). I have increased my executor memory to be 20GB per > >> > >> executor, but > >> > >> > > am > >> > >> > > > now getting this error from Arrow. > >> > >> > > > Looking for some pointers so I can understand this issue > >> better. > >> > >> > > > > >> > >> > > > Here's what I am trying. I have 2 tables with string columns > >> where > >> > >> the > >> > >> > > > strings always have a fixed length: > >> > >> > > > *Table 1*: > >> > >> > > > id: integer > >> > >> > > > char_column1: string (length = 30) > >> > >> > > > char_column2: string (length = 40) > >> > >> > > > char_column3: string (length = 10) > >> > >> > > > ... > >> > >> > > > In total, in table1, the char-columns have ~250 characters > >> > >> > > > > >> > >> > > > *Table 2*: > >> > >> > > > id: integer > >> > >> > > > char_column1: string (length = 50) > >> > >> > > > char_column2: string (length = 3) > >> > >> > > > char_column3: string (length = 4) > >> > >> > > > ... > >> > >> > > > In total, in table2, the char-columns have ~150 characters > >> > >> > > > > >> > >> > > > I am joining these tables by ID. In my current dataset, I > have > >> > >> filtered > >> > >> > > my > >> > >> > > > data so only id=1 exists. > >> > >> > > > Table1 has ~400 records for id=1 and table2 has 50k records > for > >> > >> id=1. > >> > >> > > > Hence, total number of records (after joining) for > >> table1_join2 = > >> > >> 400 * > >> > >> > > 50k > >> > >> > > > = 20*10^6 records > >> > >> > > > Each row has ~400bytes (150+250) => overall memory = 8*10^9 > >> bytes > >> > >> => ~8GB > >> > >> > > > > >> > >> > > > Now, when I try an executor with 20GB RAM, it does not work. > >> > >> > > > Is there some data duplicity happening internally ? What > >> should be > >> > >> the > >> > >> > > > estimated RAM I need to give for this to work ? > >> > >> > > > > >> > >> > > > Thanks for reading, > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > >> > > >