PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8. I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in the last stage of the job regardless of my output type.
The problem I'm trying to solve: I have a column of scalar values, and each value on the same row has a sorted vector. I'm trying to replace each scalar value with its closest index from its vector. I'm applying the grouping arbitrarily and performing a python operation row-wise because even when the same vector appears on many rows it's not clear how I would get the lookup to scale. My input data, the product of a join of hive tables, has the following schema: root |-- scalar_value: float (nullable = true) |-- quantilelist: array (nullable = true) | |-- element: double (containsNull = true) My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform an operation on two columns, and because I want to take advantage of Arrow to avoid serialization. The schema my UDF returns is this: pos_schema = T.StructType([ T.StructField('feature_value',T.FloatType(),True), T.StructField('error',T.StringType()) ]) ...however when I try to apply my UDF, either with saveAsTable or show(), I get the following exception: org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447) at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426) at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838) at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221) at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223) at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308) at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) I assumed it was the result of some bad typing on my part, until I did a test with a degenerate UDF that only returns a column of 1: @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]), F.PandasUDFType.GROUPED_MAP) def groupedPercentileInt(df): return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True) This clearly only has one return value of type int, yet I get the same exception: org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447) at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426) at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838) at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221) at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223) at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308) at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) What seems very strange is that it's still falling over when trying to work with double types, even though I'm not working with any double types. I tried to look into the underlying code, but I don't know Scala well enough to suss out the issue. Is this a bug? My UDF: @F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP) def groupedPercentile(df): """ Pandas UDF to apply binary search for a group of records. """ def getPercentile(x): """ Given a scalar v and a 1000-length vector of quantiles q produce the percentile of the distribution most closely corresponding to v's position in q """ v = x['scalar_value'] q = x['quantilelist'] # the vector is length 1000 so for the sake of simplicity # we're going to pretend it's actually 1024 q1024 = [] q1024.extend(q.tolist()) q1024.extend([q[-1]]*24) start = 0 end = 1024 while start != end: half_len = int((end - start) / 2) if v > q1024[start + half_len]: start = (end - half_len) else: end = (end - half_len) if start > 1000: start = 1000 return start try: df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1) df.loc[:,'error'] = [None]*df.shape[0] except Exception as e: df.loc[:,'feature_value'] = [None]*df.shape[0] df.loc[:,'error'] = [str(e)]*df.shape[0] finally: return df[['feature_value','error']]