The 2G limit that Uwe mentioned definitely exists, Spark serialize each
group as a single RecordBatch currently.

The "pyarrow.lib.ArrowIOError: read length must be positive or -1" is
strange, I think Spark is on an older version of the Java side (0.10 for
Spark 2.4 and 0.8 for Spark 2.3). I forgot whether there is binary
incompatibility between these versions and pyarrow 0.12.

On Fri, Mar 1, 2019 at 3:32 PM Abdeali Kothari <abdealikoth...@gmail.com>
wrote:

> Forgot to mention: The above testing is with 0.11.1
> I tried 0.12.1 as you suggested - and am getting the
> OversizedAllocationException with the 80char column. And getting read
> length must be positive or -1 without that. So, both the issues are
> reproducible with pyarrow 0.12.1
>
> On Sat, Mar 2, 2019 at 1:57 AM Abdeali Kothari <abdealikoth...@gmail.com>
> wrote:
>
> > That was spot on!
> > I had 3 columns with 80characters => 80*21*10^6 = 1.56 bytes
> > I removed these columns and replaced each with 10 doubleType columns (so
> > it would still be 80 bytes of data) - and this error didn't come up
> anymore.
> > I also removed all the other columns and just kept 1 column with
> > 80characters - I got the error again.
> >
> > I'll make a simpler example and report it to spark - as I guess these
> > columns would need some special handling.
> >
> > Now, when I run - I get a different error:
> > 19/03/01 20:16:49 WARN TaskSetManager: Lost task 108.0 in stage 8.0 (TID
> > 12, ip-172-31-10-249.us-west-2.compute.internal, executor 1):
> > org.apache.spark.api.python.PythonException: Traceback (most recent call
> > last):
> >   File
> >
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py",
> > line 230, in main
> >     process()
> >   File
> >
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py",
> > line 225, in process
> >     serializer.dump_stream(func(split_index, iterator), outfile)
> >   File
> >
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py",
> > line 260, in dump_stream
> >     for series in iterator:
> >   File
> >
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py",
> > line 279, in load_stream
> >     for batch in reader:
> >   File "pyarrow/ipc.pxi", line 265, in __iter__
> >   File "pyarrow/ipc.pxi", line 281, in
> > pyarrow.lib._RecordBatchReader.read_next_batch
> >   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> > pyarrow.lib.ArrowIOError: read length must be positive or -1
> >
> > Again, any pointers on what this means and what it indicates would be
> > really useful for me.
> >
> > Thanks for the replies!
> >
> >
> > On Fri, Mar 1, 2019 at 11:26 PM Uwe L. Korn <uw...@xhochy.com> wrote:
> >
> >> There is currently the limitation that a column in a single RecordBatch
> >> can only hold 2G on the Java side. We work around this by splitting the
> >> DataFrame under the hood into multiple RecordBatches. I'm not familiar
> with
> >> the Spark<->Arrow code but I guess that in this case, the Spark code can
> >> only handle a single RecordBatch.
> >>
> >> Probably it is best to construct a https://stackoverflow.com/help/mcve
> >> and create an issue with the Spark project. Most likely this is not a
> bug
> >> in Arrow but just requires a bit more complicated implementation around
> the
> >> Arrow libs.
> >>
> >> Still, please have a look at the exact size of your columns. We support
> >> 2G per column, if it is only 1.5G, then there is probably a rounding
> error
> >> in the Arrow. Alternatively, you might also be in luck that the
> following
> >> patch
> >>
> https://github.com/apache/arrow/commit/bfe6865ba8087a46bd7665679e48af3a77987cef
> >> which is part of Apache Arrow 0.12 already fixes your problem.
> >>
> >> Uwe
> >>
> >> On Fri, Mar 1, 2019, at 6:48 PM, Abdeali Kothari wrote:
> >> > Is there a limitation that a single column cannot be more than 1-2G ?
> >> > One of my columns definitely would be around 1.5GB of memory.
> >> >
> >> > I cannot split my DF into more partitions as I have only 1 ID and I'm
> >> > grouping by that ID.
> >> > So, the UDAF would only run on a single pandasDF
> >> > I do have a requirement to make a very large DF for this UDAF (8GB as
> i
> >> > mentioned above) - trying to figure out what I need to do here to make
> >> this
> >> > work.
> >> > Increasing RAM, etc. is no issue (i understand I'd need huge executors
> >> as I
> >> > have a huge data requirement). But trying to figure out how much to
> >> > actually get - cause 20GB of RAM for the executor is also erroring out
> >> > where I thought ~10GB would have been enough
> >> >
> >> >
> >> >
> >> > On Fri, Mar 1, 2019 at 10:25 PM Uwe L. Korn <uw...@xhochy.com> wrote:
> >> >
> >> > > Hello Abdeali,
> >> > >
> >> > > a problem could here be that a single column of your dataframe is
> >> using
> >> > > more than 2GB of RAM (possibly also just 1G). Try splitting your
> >> DataFrame
> >> > > into more partitions before applying the UDAF.
> >> > >
> >> > > Cheers
> >> > > Uwe
> >> > >
> >> > > On Fri, Mar 1, 2019, at 9:09 AM, Abdeali Kothari wrote:
> >> > > > I was using arrow with spark+python and when I'm trying some
> >> pandas-UDAF
> >> > > > functions I am getting this error:
> >> > > >
> >> > > > org.apache.arrow.vector.util.OversizedAllocationException: Unable
> to
> >> > > > expand
> >> > > > the buffer
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:457)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1188)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1026)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:256)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> >> > > > at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
> >> > > > at
> >> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
> >> > > > at
> >> > > >
> >> > >
> >>
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
> >> > > >
> >> > > > I was initially getting a RAM is insufficient error - and
> >> theoretically
> >> > > > (with no compression) realized that the pandas DataFrame it would
> >> try to
> >> > > > create would be ~8GB (21million records with each record having
> ~400
> >> > > > bytes). I have increased my executor memory to be 20GB per
> >> executor, but
> >> > > am
> >> > > > now getting this error from Arrow.
> >> > > > Looking for some pointers so I can understand this issue better.
> >> > > >
> >> > > > Here's what I am trying. I have 2 tables with string columns where
> >> the
> >> > > > strings always have a fixed length:
> >> > > > *Table 1*:
> >> > > >     id: integer
> >> > > >    char_column1: string (length = 30)
> >> > > >    char_column2: string (length = 40)
> >> > > >    char_column3: string (length = 10)
> >> > > >    ...
> >> > > > In total, in table1, the char-columns have ~250 characters
> >> > > >
> >> > > > *Table 2*:
> >> > > >     id: integer
> >> > > >    char_column1: string (length = 50)
> >> > > >    char_column2: string (length = 3)
> >> > > >    char_column3: string (length = 4)
> >> > > >    ...
> >> > > > In total, in table2, the char-columns have ~150 characters
> >> > > >
> >> > > > I am joining these tables by ID. In my current dataset, I have
> >> filtered
> >> > > my
> >> > > > data so only id=1 exists.
> >> > > > Table1 has ~400 records for id=1 and table2 has 50k records for
> >> id=1.
> >> > > > Hence, total number of records (after joining) for table1_join2 =
> >> 400 *
> >> > > 50k
> >> > > > = 20*10^6 records
> >> > > > Each row has ~400bytes (150+250) => overall memory = 8*10^9 bytes
> >> => ~8GB
> >> > > >
> >> > > > Now, when I try an executor with 20GB RAM, it does not work.
> >> > > > Is there some data duplicity happening internally ? What should be
> >> the
> >> > > > estimated RAM I need to give for this to work ?
> >> > > >
> >> > > > Thanks for reading,
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to