Thanks Byran. I think it was ultimately groupings that were too large - after setting spark.sql.shuffle.partitions to a much higher number I was able to get the UDF to execute.
On Fri, Jul 20, 2018 at 12:45 AM, Bryan Cutler <cutl...@gmail.com> wrote: > Hi Patrick, > > It looks like it's failing in Scala before it even gets to Python to > execute your udf, which is why it doesn't seem to matter what's in your > udf. Since you are doing a grouped map udf maybe your group sizes are too > big or skewed? Could you try to reduce the size of your groups by adding > more keys or sampling a fraction of the data? If the problem persists could > you make a jira? At the very least a better exception would be nice. > > Bryan > > On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy > <pmccar...@dstillery.com.invalid> > wrote: > >> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8. >> >> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions >> in the last stage of the job regardless of my output type. >> >> >> The problem I'm trying to solve: >> I have a column of scalar values, and each value on the same row has a >> sorted vector. I'm trying to replace each scalar value with its closest >> index from its vector. I'm applying the grouping arbitrarily and performing >> a python operation row-wise because even when the same vector appears on >> many rows it's not clear how I would get the lookup to scale. >> >> My input data, the product of a join of hive tables, has the following >> schema: >> >> root >> |-- scalar_value: float (nullable = true) >> |-- quantilelist: array (nullable = true) >> | |-- element: double (containsNull = true) >> >> >> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to >> perform an operation on two columns, and because I want to take advantage >> of Arrow to avoid serialization. >> >> The schema my UDF returns is this: >> >> pos_schema = T.StructType([ >> T.StructField('feature_value',T.FloatType(),True), >> T.StructField('error',T.StringType()) >> ]) >> >> ...however when I try to apply my UDF, either with saveAsTable or show(), >> I get the following exception: >> >> org.apache.arrow.vector.util.OversizedAllocationException: Unable to >> expand the buffer >> at org.apache.arrow.vector.BaseFixedWidthVector. >> reallocBufferHelper(BaseFixedWidthVector.java:447) >> at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc( >> BaseFixedWidthVector.java:426) >> at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe( >> BaseFixedWidthVector.java:838) >> at org.apache.arrow.vector.Float8Vector.setSafe( >> Float8Vector.java:221) >> at org.apache.spark.sql.execution.arrow.DoubleWriter. >> setValue(ArrowWriter.scala:223) >> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write( >> ArrowWriter.scala:122) >> at org.apache.spark.sql.execution.arrow.ArrayWriter. >> setValue(ArrowWriter.scala:308) >> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write( >> ArrowWriter.scala:122) >> at org.apache.spark.sql.execution.arrow.ArrowWriter. >> write(ArrowWriter.scala:87) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp( >> ArrowPythonRunner.scala:84) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$ >> 1.apply(ArrowPythonRunner.scala:75) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$ >> 1.apply(ArrowPythonRunner.scala:75) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils. >> scala:1380) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2.writeIteratorToStream( >> ArrowPythonRunner.scala:95) >> at org.apache.spark.api.python.BasePythonRunner$WriterThread$ >> $anonfun$run$1.apply(PythonRunner.scala:215) >> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils. >> scala:1991) >> at org.apache.spark.api.python.BasePythonRunner$WriterThread. >> run(PythonRunner.scala:170) >> >> I assumed it was the result of some bad typing on my part, until I did a >> test with a degenerate UDF that only returns a column of 1: >> >> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]), >> >> F.PandasUDFType.GROUPED_MAP) >> >> def groupedPercentileInt(df): >> >> return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_ >> index(drop=True) >> >> >> This clearly only has one return value of type int, yet I get the same >> exception: >> >> org.apache.arrow.vector.util.OversizedAllocationException: Unable to >> expand the buffer >> at org.apache.arrow.vector.BaseFixedWidthVector. >> reallocBufferHelper(BaseFixedWidthVector.java:447) >> at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc( >> BaseFixedWidthVector.java:426) >> at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe( >> BaseFixedWidthVector.java:838) >> at org.apache.arrow.vector.Float8Vector.setSafe( >> Float8Vector.java:221) >> at org.apache.spark.sql.execution.arrow.DoubleWriter. >> setValue(ArrowWriter.scala:223) >> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write( >> ArrowWriter.scala:122) >> at org.apache.spark.sql.execution.arrow.ArrayWriter. >> setValue(ArrowWriter.scala:308) >> at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write( >> ArrowWriter.scala:122) >> at org.apache.spark.sql.execution.arrow.ArrowWriter. >> write(ArrowWriter.scala:87) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp( >> ArrowPythonRunner.scala:84) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$ >> 1.apply(ArrowPythonRunner.scala:75) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$ >> 1.apply(ArrowPythonRunner.scala:75) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils. >> scala:1380) >> at org.apache.spark.sql.execution.python. >> ArrowPythonRunner$$anon$2.writeIteratorToStream( >> ArrowPythonRunner.scala:95) >> at org.apache.spark.api.python.BasePythonRunner$WriterThread$ >> $anonfun$run$1.apply(PythonRunner.scala:215) >> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils. >> scala:1991) >> at org.apache.spark.api.python.BasePythonRunner$WriterThread. >> run(PythonRunner.scala:170) >> >> What seems very strange is that it's still falling over when trying to >> work with double types, even though I'm not working with any double types. >> I tried to look into the underlying code, but I don't know Scala well >> enough to suss out the issue. >> >> >> Is this a bug? >> >> My UDF: >> >> @F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP) >> def groupedPercentile(df): >> """ >> Pandas UDF to apply binary search for a group of records. >> """ >> >> def getPercentile(x): >> """ >> Given a scalar v and a 1000-length vector of quantiles q >> produce the percentile of the distribution most closely >> corresponding to v's position in q >> """ >> >> v = x['scalar_value'] >> q = x['quantilelist'] >> >> # the vector is length 1000 so for the sake of simplicity >> # we're going to pretend it's actually 1024 >> >> q1024 = [] >> q1024.extend(q.tolist()) >> q1024.extend([q[-1]]*24) >> >> start = 0 >> end = 1024 >> >> while start != end: >> >> half_len = int((end - start) / 2) >> >> if v > q1024[start + half_len]: >> start = (end - half_len) >> else: >> end = (end - half_len) >> >> if start > 1000: >> start = 1000 >> >> return start >> >> try: >> df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1) >> df.loc[:,'error'] = [None]*df.shape[0] >> >> except Exception as e: >> df.loc[:,'feature_value'] = [None]*df.shape[0] >> df.loc[:,'error'] = [str(e)]*df.shape[0] >> >> finally: >> return df[['feature_value','error']] >> >