Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to
execute your udf, which is why it doesn't seem to matter what's in your
udf. Since you are doing a grouped map udf maybe your group sizes are too
big or skewed? Could you try to reduce the size of your groups by adding
more keys or sampling a fraction of the data? If the problem persists could
you make a jira? At the very least a better exception would be nice.


On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy
<pmccar...@dstillery.com.invalid> wrote:

> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
> in the last stage of the job regardless of my output type.
> The problem I'm trying to solve:
> I have a column of scalar values, and each value on the same row has a
> sorted vector. I'm trying to replace each scalar value with its closest
> index from its vector. I'm applying the grouping arbitrarily and performing
> a python operation row-wise because even when the same vector appears on
> many rows it's not clear how I would get the lookup to scale.
> My input data, the product of a join of hive tables, has the following
> schema:
> root
>  |-- scalar_value: float (nullable = true)
>  |-- quantilelist: array (nullable = true)
>  |    |-- element: double (containsNull = true)
> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
> an operation on two columns, and because I want to take advantage of Arrow
> to avoid serialization.
> The schema my UDF returns is this:
> pos_schema = T.StructType([
>     T.StructField('feature_value',T.FloatType(),True),
>     T.StructField('error',T.StringType())
> ])
> ...however when I try to apply my UDF, either with saveAsTable or show(),
> I get the following exception:
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
>         at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
>         at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
>         at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
>         at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
>         at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
>         at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
>         at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
>         at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
>         at
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
>         at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
>         at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
>         at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
> I assumed it was the result of some bad typing on my part, until I did a
> test with a degenerate UDF that only returns a column of 1:
> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>     F.PandasUDFType.GROUPED_MAP)
> def groupedPercentileInt(df):
>     return
> pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)
> This clearly only has one return value of type int, yet I get the same
> exception:
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
>         at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
>         at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
>         at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
>         at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
>         at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
>         at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
>         at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
>         at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
>         at
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>         at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
>         at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
>         at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
>         at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
> What seems very strange is that it's still falling over when trying to
> work with double types, even though I'm not working with any double types.
> I tried to look into the underlying code, but I don't know Scala well
> enough to suss out the issue.
> Is this a bug?
> My UDF:
> @F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
> def groupedPercentile(df):
>     """
>     Pandas UDF to apply binary search for a group of records.
>     """
>     def getPercentile(x):
>         """
>         Given a scalar v and a 1000-length vector of quantiles q
>         produce the percentile of the distribution most closely
>         corresponding to v's position in q
>         """
>         v = x['scalar_value']
>         q = x['quantilelist']
>         # the vector is length 1000 so for the sake of simplicity
>         # we're going to pretend it's actually 1024
>         q1024 = []
>         q1024.extend(q.tolist())
>         q1024.extend([q[-1]]*24)
>         start = 0
>         end = 1024
>         while start != end:
>             half_len = int((end - start) / 2)
>             if v > q1024[start + half_len]:
>                 start = (end - half_len)
>             else:
>                 end = (end - half_len)
>         if start > 1000:
>             start = 1000
>         return start
>     try:
>         df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
>         df.loc[:,'error'] = [None]*df.shape[0]
>     except Exception as e:
>         df.loc[:,'feature_value'] = [None]*df.shape[0]
>         df.loc[:,'error'] = [str(e)]*df.shape[0]
>     finally:
>         return df[['feature_value','error']]

Reply via email to