Thanks Byran. I think it was ultimately groupings that were too large -
after setting spark.sql.shuffle.partitions to a much higher number I was
able to get the UDF to execute.

On Fri, Jul 20, 2018 at 12:45 AM, Bryan Cutler <cutl...@gmail.com> wrote:

> Hi Patrick,
>
> It looks like it's failing in Scala before it even gets to Python to
> execute your udf, which is why it doesn't seem to matter what's in your
> udf. Since you are doing a grouped map udf maybe your group sizes are too
> big or skewed? Could you try to reduce the size of your groups by adding
> more keys or sampling a fraction of the data? If the problem persists could
> you make a jira? At the very least a better exception would be nice.
>
> Bryan
>
> On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy 
> <pmccar...@dstillery.com.invalid>
> wrote:
>
>> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
>>
>> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
>> in the last stage of the job regardless of my output type.
>>
>>
>> The problem I'm trying to solve:
>> I have a column of scalar values, and each value on the same row has a
>> sorted vector. I'm trying to replace each scalar value with its closest
>> index from its vector. I'm applying the grouping arbitrarily and performing
>> a python operation row-wise because even when the same vector appears on
>> many rows it's not clear how I would get the lookup to scale.
>>
>> My input data, the product of a join of hive tables, has the following
>> schema:
>>
>> root
>>  |-- scalar_value: float (nullable = true)
>>  |-- quantilelist: array (nullable = true)
>>  |    |-- element: double (containsNull = true)
>>
>>
>> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to
>> perform an operation on two columns, and because I want to take advantage
>> of Arrow to avoid serialization.
>>
>> The schema my UDF returns is this:
>>
>> pos_schema = T.StructType([
>>     T.StructField('feature_value',T.FloatType(),True),
>>     T.StructField('error',T.StringType())
>> ])
>>
>> ...however when I try to apply my UDF, either with saveAsTable or show(),
>> I get the following exception:
>>
>> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
>> expand the buffer
>>         at org.apache.arrow.vector.BaseFixedWidthVector.
>> reallocBufferHelper(BaseFixedWidthVector.java:447)
>>         at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(
>> BaseFixedWidthVector.java:426)
>>         at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(
>> BaseFixedWidthVector.java:838)
>>         at org.apache.arrow.vector.Float8Vector.setSafe(
>> Float8Vector.java:221)
>>         at org.apache.spark.sql.execution.arrow.DoubleWriter.
>> setValue(ArrowWriter.scala:223)
>>         at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>>         at org.apache.spark.sql.execution.arrow.ArrayWriter.
>> setValue(ArrowWriter.scala:308)
>>         at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>>         at org.apache.spark.sql.execution.arrow.ArrowWriter.
>> write(ArrowWriter.scala:87)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(
>> ArrowPythonRunner.scala:84)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.
>> scala:1380)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2.writeIteratorToStream(
>> ArrowPythonRunner.scala:95)
>>         at org.apache.spark.api.python.BasePythonRunner$WriterThread$
>> $anonfun$run$1.apply(PythonRunner.scala:215)
>>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
>> scala:1991)
>>         at org.apache.spark.api.python.BasePythonRunner$WriterThread.
>> run(PythonRunner.scala:170)
>>
>> I assumed it was the result of some bad typing on my part, until I did a
>> test with a degenerate UDF that only returns a column of 1:
>>
>> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>>
>>     F.PandasUDFType.GROUPED_MAP)
>>
>> def groupedPercentileInt(df):
>>
>>     return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_
>> index(drop=True)
>>
>>
>> This clearly only has one return value of type int, yet I get the same
>> exception:
>>
>> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
>> expand the buffer
>>         at org.apache.arrow.vector.BaseFixedWidthVector.
>> reallocBufferHelper(BaseFixedWidthVector.java:447)
>>         at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(
>> BaseFixedWidthVector.java:426)
>>         at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(
>> BaseFixedWidthVector.java:838)
>>         at org.apache.arrow.vector.Float8Vector.setSafe(
>> Float8Vector.java:221)
>>         at org.apache.spark.sql.execution.arrow.DoubleWriter.
>> setValue(ArrowWriter.scala:223)
>>         at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>>         at org.apache.spark.sql.execution.arrow.ArrayWriter.
>> setValue(ArrowWriter.scala:308)
>>         at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(
>> ArrowWriter.scala:122)
>>         at org.apache.spark.sql.execution.arrow.ArrowWriter.
>> write(ArrowWriter.scala:87)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(
>> ArrowPythonRunner.scala:84)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$
>> 1.apply(ArrowPythonRunner.scala:75)
>>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.
>> scala:1380)
>>         at org.apache.spark.sql.execution.python.
>> ArrowPythonRunner$$anon$2.writeIteratorToStream(
>> ArrowPythonRunner.scala:95)
>>         at org.apache.spark.api.python.BasePythonRunner$WriterThread$
>> $anonfun$run$1.apply(PythonRunner.scala:215)
>>         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
>> scala:1991)
>>         at org.apache.spark.api.python.BasePythonRunner$WriterThread.
>> run(PythonRunner.scala:170)
>>
>> What seems very strange is that it's still falling over when trying to
>> work with double types, even though I'm not working with any double types.
>> I tried to look into the underlying code, but I don't know Scala well
>> enough to suss out the issue.
>>
>>
>> Is this a bug?
>>
>> My UDF:
>>
>> @F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
>> def groupedPercentile(df):
>>     """
>>     Pandas UDF to apply binary search for a group of records.
>>     """
>>
>>     def getPercentile(x):
>>         """
>>         Given a scalar v and a 1000-length vector of quantiles q
>>         produce the percentile of the distribution most closely
>>         corresponding to v's position in q
>>         """
>>
>>         v = x['scalar_value']
>>         q = x['quantilelist']
>>
>>         # the vector is length 1000 so for the sake of simplicity
>>         # we're going to pretend it's actually 1024
>>
>>         q1024 = []
>>         q1024.extend(q.tolist())
>>         q1024.extend([q[-1]]*24)
>>
>>         start = 0
>>         end = 1024
>>
>>         while start != end:
>>
>>             half_len = int((end - start) / 2)
>>
>>             if v > q1024[start + half_len]:
>>                 start = (end - half_len)
>>             else:
>>                 end = (end - half_len)
>>
>>         if start > 1000:
>>             start = 1000
>>
>>         return start
>>
>>     try:
>>         df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
>>         df.loc[:,'error'] = [None]*df.shape[0]
>>
>>     except Exception as e:
>>         df.loc[:,'feature_value'] = [None]*df.shape[0]
>>         df.loc[:,'error'] = [str(e)]*df.shape[0]
>>
>>     finally:
>>         return df[['feature_value','error']]
>>
>

Reply via email to