[ https://issues.apache.org/jira/browse/SPARK-22023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-22023. ---------------------------------- Resolution: Invalid > Multi-column Spark SQL UDFs broken in Python 3 > ---------------------------------------------- > > Key: SPARK-22023 > URL: https://issues.apache.org/jira/browse/SPARK-22023 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.2.0 > Environment: Python3 > Reporter: Oli Hall > > I've been testing some existing PySpark code after migrating to Python3, and > there seems to be an issue with multi-column UDFs in Spark SQL. Essentially, > any UDF that takes in more than one column as input fails with an error > relating to expansion of an underlying lambda expression: > {code} > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 177, > in main > process() > File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 172, > in process > serializer.dump_stream(func(split_index, iterator), outfile) > File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line > 237, in dump_stream > self.serializer.dump_stream(self._batched(iterator), stream) > File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line > 138, in dump_stream > for obj in iterator: > File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line > 226, in _batched > for item in iterator: > File "<string>", line 1, in <lambda> > File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 71, > in <lambda> > return lambda *a: f(*a) > TypeError: <lambda>() takes 1 positional argument but 2 were given > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) > at > org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) > at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) > at > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) > at > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > This syntax should (and does) work in Python 2, but is not valid in Python 3, > I believe. > I have a minimal example that reproduces the error, running in the PySpark > shell, with Python 3.6.2, Spark 2.2: > {code} > >>> from pyspark.sql.functions import udf > >>> from pyspark.sql.types import LongType > >>> > >>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, > >>> 'b': 2}])) > /Users/oli-hall/Documents/Code/spark2/python/pyspark/sql/session.py:351: > UserWarning: Using RDD of dict to inferSchema is deprecated. Use > pyspark.sql.Row instead > warnings.warn("Using RDD of dict to inferSchema is deprecated. " > >>> df.printSchema() > root > |-- a: long (nullable = true) > |-- b: long (nullable = true) > >>> sum_udf = udf(lambda x: x[0] + x[1], LongType()) > >>> > >>> with_sum = df.withColumn('sum', sum_udf('a', 'b')) > >>> > >>> with_sum.first() > 17/09/15 11:43:56 ERROR executor.Executor: Exception in task 2.0 in stage 3.0 > (TID 8) > ... (error snipped) > TypeError: <lambda>() takes 1 positional argument but 2 were given > {code} > I've managed to work around it for now, by pushing the input columns into a > struct, then modifying the UDF to read from the struct, but it'd be good if I > could use multi-column input again. > Workaround: > {code} > >>> from pyspark.sql.functions import udf, struct > >>> from pyspark.sql.types import LongType > >>> > >>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, > >>> 'b': 2}])) > >>> > >>> sum_udf = udf(lambda x: x.a + x.b, LongType()) > >>> > >>> with_sum = df.withColumn('temp_struct', struct('a', > >>> 'b')).withColumn('sum', sum_udf('temp_struct')) > >>> with_sum.first() > Row(a=1, b=1, temp_struct=Row(a=1, b=1), sum=2) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org