Julien Peloton created SPARK-29367:
--------------------------------------

             Summary: pandas udf not working with latest pyarrow release 
(0.15.0)
                 Key: SPARK-29367
                 URL: https://issues.apache.org/jira/browse/SPARK-29367
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.3, 2.4.1, 2.4.0
            Reporter: Julien Peloton


Hi,

I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my 
pyspark jobs using pandas udf are failing with 
java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 2.4.3). 
Here is a full example to reproduce the failure with pyarrow 0.15:

{code:python}
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import BooleanType

import pandas as pd

@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
    """ Apply simple quality cuts

    Returns
    ----------
    out: pandas.Series of booleans
    Return a Pandas DataFrame with the appropriate flag: false for bad alert,
        and true for good alert.

    """
    mask = nbad.values == 0
    mask *= rb.values >= 0.55
    mask *= abs(magdiff.values) <= 0.1

    return pd.Series(mask)


spark = SparkSession.builder.getOrCreate()

# Create dummy DF
colnames = ["nbad", "rb", "magdiff"]
df = spark.sparkContext.parallelize(
    zip(
        [0, 1, 0, 0],
        [0.01, 0.02, 0.6, 0.01],
        [0.02, 0.05, 0.1, 0.01]
    )
).toDF(colnames)

df.show()

# Apply cuts
df = df\
    .withColumn("toKeep", qualitycuts(*colnames))\
    .filter("toKeep == true")\
    .drop("toKeep")

# This will fail if latest pyarrow 0.15.0 is used
df.show()
{code}

and the log is:

{code}
Driver stacktrace:
19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at 
NativeMethodAccessorImpl.java:0, took 0.660523 s
Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", 
line 44, in <module>
    df.show()
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
 line 378, in show
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
 line 1257, in __call__
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
5, localhost, executor driver): java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at 
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
        at 
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at 
org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
        at 
org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
        at 
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
        at 
org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
        at 
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
        at 
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at 
org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
        at 
org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
        at 
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
        at 
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

I am not sure what is the root of this failure, but I note there is a ticket 
opened (https://issues.apache.org/jira/browse/ARROW-6429) suggesting some work 
ongoing on the Spark side.

I guess any user upgrading pyarrow would face the same error right away, and 
any help or feedback would be appreciated.

Thanks,
Julien



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to