This is a known issue. See
https://issues.apache.org/jira/browse/SPARK-7902 -Xiangrui

On Thu, Jun 18, 2015 at 6:41 AM, calstad <colin.als...@gmail.com> wrote:
> I am having trouble using a UDF on a column of Vectors in PySpark which can
> be illustrated here:
>
> from pyspark import SparkContext
> from pyspark.sql import Row
> from pyspark.sql.types import DoubleType
> from pyspark.sql.functions import udf
> from pyspark.mllib.linalg import Vectors
>
> FeatureRow = Row('id', 'features')
> data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
>                                (1, Vectors.dense([2.25, -11.1, 123.2])),
>                                (2, Vectors.dense([-7.2, 1.0, -3.2]))])
> df = data.map(lambda r: FeatureRow(*r)).toDF()
>
> vector_udf = udf(lambda vector: sum(vector), DoubleType())
>
> df.withColumn('feature_sums', vector_udf(df.features)).first()
>
> This fails with the following stack trace:
>
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
> in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage
> 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):
>   File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line 111, in main
>     process()
>   File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py",
> line 106, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
> x1  File
> "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
> 263, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "/Users/colin/src/spark/python/pyspark/sql/functions.py", line 469,
> in <lambda>
>     func = lambda _, it: map(lambda x: f(*x), it)
>   File "/Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py", line
> 143, in <lambda>
> TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
>
>
> Looking at what gets passed to the UDF, there seems to be something strange.
> The argument passed should be a Vector, but instead it gets passed a Python
> tuple like this:
>
> (1, None, None, [9.7, 1.0, -3.2])
>
> Is it not possible to use UDFs on DataFrame columns of Vectors?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-PySpark-UDF-on-a-column-of-Vectors-tp23393.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to