[ 
https://issues.apache.org/jira/browse/SPARK-12157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399743#comment-15399743
 ] 

Nicholas Chammas commented on SPARK-12157:
------------------------------------------

It appears that it's not possible to have a UDF that returns a {{Vector}}.

For example, consider this UDF:

{code}
    featurize_udf = udf(
        lambda person1, person2: featurize(person1, person2),
        ArrayType(elementType=FloatType(), containsNull=False)
    )
{code}

{{featurize()}} returns a {{DenseVector}}, which I understand is a wrapper for 
some numpy array type.

Trying to use this UDF on a DataFrame yields:

{code}
Traceback (most recent call last):
  File ".../thing.py", line 134, in <module>
    .alias('pair_features'))
  File 
"/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
 line 310, in collect
  File 
"/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
 line 933, in __call__
  File 
"/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
  File 
"/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
 line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o94.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in 
stage 3.0 failed 1 times, most recent failure: Lost task 21.0 in stage 3.0 (TID 
34, localhost): net.razorvine.pickle.PickleException: expected zero arguments 
for construction of ClassDict (for pyspark.ml.linalg.DenseVector)
        at 
net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at 
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$5.apply(BatchEvalPythonExec.scala:137)
        at 
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$5.apply(BatchEvalPythonExec.scala:136)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

> Support numpy types as return values of Python UDFs
> ---------------------------------------------------
>
>                 Key: SPARK-12157
>                 URL: https://issues.apache.org/jira/browse/SPARK-12157
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, SQL
>    Affects Versions: 1.5.2
>            Reporter: Justin Uang
>
> Currently, if I have a python UDF
> {code}
> import pyspark.sql.types as T
> import pyspark.sql.functions as F
> from pyspark.sql import Row
> import numpy as np
> argmax = F.udf(lambda x: np.argmax(x), T.IntegerType())
> df = sqlContext.createDataFrame([Row(array=[1,2,3])])
> df.select(argmax("array")).count()
> {code}
> I get an exception that is fairly opaque:
> {code}
> Caused by: net.razorvine.pickle.PickleException: expected zero arguments for 
> construction of ClassDict (for numpy.dtype)
>         at 
> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>         at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:701)
>         at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:171)
>         at net.razorvine.pickle.Unpickler.load(Unpickler.java:85)
>         at net.razorvine.pickle.Unpickler.loads(Unpickler.java:98)
>         at 
> org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1$$anonfun$apply$3.apply(python.scala:404)
>         at 
> org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1$$anonfun$apply$3.apply(python.scala:403)
> {code}
> Numpy types like np.int and np.float64 should automatically be cast to the 
> proper dtypes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to