PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in
the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a
sorted vector. I'm trying to replace each scalar value with its closest
index from its vector. I'm applying the grouping arbitrarily and performing
a python operation row-wise because even when the same vector appears on
many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following
schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 |    |-- element: double (containsNull = true)


My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
an operation on two columns, and because I want to take advantage of Arrow
to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
    T.StructField('feature_value',T.FloatType(),True),
    T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I
get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
        at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

I assumed it was the result of some bad typing on my part, until I did a
test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),

    F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

    return
pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)


This clearly only has one return value of type int, yet I get the same
exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
        at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

What seems very strange is that it's still falling over when trying to work
with double types, even though I'm not working with any double types. I
tried to look into the underlying code, but I don't know Scala well enough
to suss out the issue.


Is this a bug?

My UDF:

@F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
def groupedPercentile(df):
    """
    Pandas UDF to apply binary search for a group of records.
    """

    def getPercentile(x):
        """
        Given a scalar v and a 1000-length vector of quantiles q
        produce the percentile of the distribution most closely
        corresponding to v's position in q
        """

        v = x['scalar_value']
        q = x['quantilelist']

        # the vector is length 1000 so for the sake of simplicity
        # we're going to pretend it's actually 1024

        q1024 = []
        q1024.extend(q.tolist())
        q1024.extend([q[-1]]*24)

        start = 0
        end = 1024

        while start != end:

            half_len = int((end - start) / 2)

            if v > q1024[start + half_len]:
                start = (end - half_len)
            else:
                end = (end - half_len)

        if start > 1000:
            start = 1000

        return start

    try:
        df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
        df.loc[:,'error'] = [None]*df.shape[0]

    except Exception as e:
        df.loc[:,'feature_value'] = [None]*df.shape[0]
        df.loc[:,'error'] = [str(e)]*df.shape[0]

    finally:
        return df[['feature_value','error']]

Reply via email to