[ 
https://issues.apache.org/jira/browse/SPARK-18634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Yavuz updated SPARK-18634:
--------------------------------
    Description: 
There are some weird issues with exploding Python UDFs in SparkSQL.

There are 2 cases where based on the DataType of the exploded column, the 
result can be flat out wrong, or corrupt. Seems like something bad is happening 
when telling Tungsten the schema of the rows during or after applying the UDF.

Please check the code below for reproduction.

Notebook: 
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6186780348633019/3425836135165635/4343791953238323/latest.html
{code}
> 
from pyspark.sql.functions import *
from pyspark.sql.types import *
> 
df = spark.range(10)
Issue #1
Corrupt Data
> 
def return_range(value):
  return [(i, str(i)) for i in range(value - 1, value + 1)]

range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", 
IntegerType()),
                                                    StructField("string_val", 
StringType())])))
> 
df.select("id", explode(range_udf(df.id))).show()
+---+--------+
| id|     col|
+---+--------+
|  0|[0,null]|
|  0|[0,null]|
|  1|[0,null]|
|  1|[0,null]|
|  2|[0,null]|
|  2|[0,null]|
|  3|[0,null]|
|  3|[0,null]|
|  4|[0,null]|
|  4|[0,null]|
|  5|[0,null]|
|  5|[0,null]|
|  6|[0,null]|
|  6|[0,null]|
|  7|[0,null]|
|  7|[0,null]|
|  8|[0,null]|
|  8|[0,null]|
|  9|[0,null]|
|  9|[0,null]|
+---+--------+

Fine if I do the explode in the second step...
> 
df.select("id", range_udf(df.id).alias("range")).select("id", 
explode("range")).show()
+---+-------+
| id|    col|
+---+-------+
|  0|[-1,-1]|
|  0|  [0,0]|
|  1|  [0,0]|
|  1|  [1,1]|
|  2|  [1,1]|
|  2|  [2,2]|
|  3|  [2,2]|
|  3|  [3,3]|
|  4|  [3,3]|
|  4|  [4,4]|
|  5|  [4,4]|
|  5|  [5,5]|
|  6|  [5,5]|
|  6|  [6,6]|
|  7|  [6,6]|
|  7|  [7,7]|
|  8|  [7,7]|
|  8|  [8,8]|
|  9|  [8,8]|
|  9|  [9,9]|
+---+-------+

... or if I don't include the second column
> 
df.select(explode(range_udf(df.id))).show()
+-------+
|    col|
+-------+
|[-1,-1]|
|  [0,0]|
|  [0,0]|
|  [1,1]|
|  [1,1]|
|  [2,2]|
|  [2,2]|
|  [3,3]|
|  [3,3]|
|  [4,4]|
|  [4,4]|
|  [5,5]|
|  [5,5]|
|  [6,6]|
|  [6,6]|
|  [7,7]|
|  [7,7]|
|  [8,8]|
|  [8,8]|
|  [9,9]|
+-------+

Issue #2 Flat out wrong answer
> 
def return_range2(value):
  return range(value - 1, value + 1)

range_udf2 = udf(return_range2, ArrayType(IntegerType()))
> 
df.select("id", explode(range_udf2(df.id))).show()
+---+---+
| id|col|
+---+---+
|  0| 24|
|  0| 24|
|  1| 24|
|  1| 24|
|  2| 24|
|  2| 24|
|  3| 24|
|  3| 24|
|  4| 24|
|  4| 24|
|  5| 24|
|  5| 24|
|  6| 24|
|  6| 24|
|  7| 24|
|  7| 24|
|  8| 24|
|  8| 24|
|  9| 24|
|  9| 24|
+---+---+

> 
df.select("id", range_udf2(df.id).alias("range")).select("id", 
explode("range")).show()
+---+---+
| id|col|
+---+---+
|  0| -1|
|  0|  0|
|  1|  0|
|  1|  1|
|  2|  1|
|  2|  2|
|  3|  2|
|  3|  3|
|  4|  3|
|  4|  4|
|  5|  4|
|  5|  5|
|  6|  5|
|  6|  6|
|  7|  6|
|  7|  7|
|  8|  7|
|  8|  8|
|  9|  8|
|  9|  9|
+---+---+

> 
df.select(explode(range_udf2(df.id))).show()
+---+
|col|
+---+
| -1|
|  0|
|  0|
|  1|
|  1|
|  2|
|  2|
|  3|
|  3|
|  4|
|  4|
|  5|
|  5|
|  6|
|  6|
|  7|
|  7|
|  8|
|  8|
|  9|
+---+

{code}

  was:
There are some weird issues with exploding Python UDFs in SparkSQL.

There are 2 cases where based on the DataType of the exploded column, the 
result can be flat out wrong, or corrupt. Seems like something bad is happening 
when telling Tungsten the schema of the rows during or after applying the UDF.

Please check the attached notebook for reproduction.




> Corruption and Correctness issues with exploding Python UDFs
> ------------------------------------------------------------
>
>                 Key: SPARK-18634
>                 URL: https://issues.apache.org/jira/browse/SPARK-18634
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 2.0.2, 2.1.0
>            Reporter: Burak Yavuz
>
> There are some weird issues with exploding Python UDFs in SparkSQL.
> There are 2 cases where based on the DataType of the exploded column, the 
> result can be flat out wrong, or corrupt. Seems like something bad is 
> happening when telling Tungsten the schema of the rows during or after 
> applying the UDF.
> Please check the code below for reproduction.
> Notebook: 
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6186780348633019/3425836135165635/4343791953238323/latest.html
> {code}
> > 
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> > 
> df = spark.range(10)
> Issue #1
> Corrupt Data
> > 
> def return_range(value):
>   return [(i, str(i)) for i in range(value - 1, value + 1)]
> range_udf = udf(return_range, 
> ArrayType(StructType([StructField("integer_val", IntegerType()),
>                                                     StructField("string_val", 
> StringType())])))
> > 
> df.select("id", explode(range_udf(df.id))).show()
> +---+--------+
> | id|     col|
> +---+--------+
> |  0|[0,null]|
> |  0|[0,null]|
> |  1|[0,null]|
> |  1|[0,null]|
> |  2|[0,null]|
> |  2|[0,null]|
> |  3|[0,null]|
> |  3|[0,null]|
> |  4|[0,null]|
> |  4|[0,null]|
> |  5|[0,null]|
> |  5|[0,null]|
> |  6|[0,null]|
> |  6|[0,null]|
> |  7|[0,null]|
> |  7|[0,null]|
> |  8|[0,null]|
> |  8|[0,null]|
> |  9|[0,null]|
> |  9|[0,null]|
> +---+--------+
> Fine if I do the explode in the second step...
> > 
> df.select("id", range_udf(df.id).alias("range")).select("id", 
> explode("range")).show()
> +---+-------+
> | id|    col|
> +---+-------+
> |  0|[-1,-1]|
> |  0|  [0,0]|
> |  1|  [0,0]|
> |  1|  [1,1]|
> |  2|  [1,1]|
> |  2|  [2,2]|
> |  3|  [2,2]|
> |  3|  [3,3]|
> |  4|  [3,3]|
> |  4|  [4,4]|
> |  5|  [4,4]|
> |  5|  [5,5]|
> |  6|  [5,5]|
> |  6|  [6,6]|
> |  7|  [6,6]|
> |  7|  [7,7]|
> |  8|  [7,7]|
> |  8|  [8,8]|
> |  9|  [8,8]|
> |  9|  [9,9]|
> +---+-------+
> ... or if I don't include the second column
> > 
> df.select(explode(range_udf(df.id))).show()
> +-------+
> |    col|
> +-------+
> |[-1,-1]|
> |  [0,0]|
> |  [0,0]|
> |  [1,1]|
> |  [1,1]|
> |  [2,2]|
> |  [2,2]|
> |  [3,3]|
> |  [3,3]|
> |  [4,4]|
> |  [4,4]|
> |  [5,5]|
> |  [5,5]|
> |  [6,6]|
> |  [6,6]|
> |  [7,7]|
> |  [7,7]|
> |  [8,8]|
> |  [8,8]|
> |  [9,9]|
> +-------+
> Issue #2 Flat out wrong answer
> > 
> def return_range2(value):
>   return range(value - 1, value + 1)
> range_udf2 = udf(return_range2, ArrayType(IntegerType()))
> > 
> df.select("id", explode(range_udf2(df.id))).show()
> +---+---+
> | id|col|
> +---+---+
> |  0| 24|
> |  0| 24|
> |  1| 24|
> |  1| 24|
> |  2| 24|
> |  2| 24|
> |  3| 24|
> |  3| 24|
> |  4| 24|
> |  4| 24|
> |  5| 24|
> |  5| 24|
> |  6| 24|
> |  6| 24|
> |  7| 24|
> |  7| 24|
> |  8| 24|
> |  8| 24|
> |  9| 24|
> |  9| 24|
> +---+---+
> > 
> df.select("id", range_udf2(df.id).alias("range")).select("id", 
> explode("range")).show()
> +---+---+
> | id|col|
> +---+---+
> |  0| -1|
> |  0|  0|
> |  1|  0|
> |  1|  1|
> |  2|  1|
> |  2|  2|
> |  3|  2|
> |  3|  3|
> |  4|  3|
> |  4|  4|
> |  5|  4|
> |  5|  5|
> |  6|  5|
> |  6|  6|
> |  7|  6|
> |  7|  7|
> |  8|  7|
> |  8|  8|
> |  9|  8|
> |  9|  9|
> +---+---+
> > 
> df.select(explode(range_udf2(df.id))).show()
> +---+
> |col|
> +---+
> | -1|
> |  0|
> |  0|
> |  1|
> |  1|
> |  2|
> |  2|
> |  3|
> |  3|
> |  4|
> |  4|
> |  5|
> |  5|
> |  6|
> |  6|
> |  7|
> |  7|
> |  8|
> |  8|
> |  9|
> +---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to