[ https://issues.apache.org/jira/browse/SPARK-12157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399743#comment-15399743 ]
Nicholas Chammas commented on SPARK-12157: ------------------------------------------ It appears that it's not possible to have a UDF that returns a {{Vector}}. For example, consider this UDF: {code} featurize_udf = udf( lambda person1, person2: featurize(person1, person2), ArrayType(elementType=FloatType(), containsNull=False) ) {code} {{featurize()}} returns a {{DenseVector}}, which I understand is a wrapper for some numpy array type. Trying to use this UDF on a DataFrame yields: {code} Traceback (most recent call last): File ".../thing.py", line 134, in <module> .alias('pair_features')) File "/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 310, in collect File "/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/local/Cellar/apache-spark/2.0.0/libexec/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o94.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 3.0 failed 1 times, most recent failure: Lost task 21.0 in stage 3.0 (TID 34, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.ml.linalg.DenseVector) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$5.apply(BatchEvalPythonExec.scala:137) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1$$anonfun$apply$5.apply(BatchEvalPythonExec.scala:136) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} > Support numpy types as return values of Python UDFs > --------------------------------------------------- > > Key: SPARK-12157 > URL: https://issues.apache.org/jira/browse/SPARK-12157 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL > Affects Versions: 1.5.2 > Reporter: Justin Uang > > Currently, if I have a python UDF > {code} > import pyspark.sql.types as T > import pyspark.sql.functions as F > from pyspark.sql import Row > import numpy as np > argmax = F.udf(lambda x: np.argmax(x), T.IntegerType()) > df = sqlContext.createDataFrame([Row(array=[1,2,3])]) > df.select(argmax("array")).count() > {code} > I get an exception that is fairly opaque: > {code} > Caused by: net.razorvine.pickle.PickleException: expected zero arguments for > construction of ClassDict (for numpy.dtype) > at > net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) > at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:701) > at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:171) > at net.razorvine.pickle.Unpickler.load(Unpickler.java:85) > at net.razorvine.pickle.Unpickler.loads(Unpickler.java:98) > at > org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1$$anonfun$apply$3.apply(python.scala:404) > at > org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1$$anonfun$apply$3.apply(python.scala:403) > {code} > Numpy types like np.int and np.float64 should automatically be cast to the > proper dtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org