[ 
https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286951#comment-16286951
 ] 

Wenchen Fan commented on SPARK-15282:
-------------------------------------

Shall we resolve this ticket as now users can mark their UDFs as 
non-deterministic?

> UDF executed twice when filter on new column created by withColumn and the 
> final value may be not correct
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15282
>                 URL: https://issues.apache.org/jira/browse/SPARK-15282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>         Environment: spark 1.6.1
>            Reporter: Linbo
>
> I found this problem on spark version 1.6.1 and based on  [~tedyu] in current 
> master branch, the behavior is the same.
> Basically, i used udf and df.withColumn to create a "new" column, and then i 
> filter the values on this new columns and call show(action). I see the udf 
> function (which is used to by withColumn to create the new column) is called 
> twice(duplicated). And if filter on "old" column, udf only run once which is 
> expected. I attached the example codes,  `filteredOnNewColumnDF.show` shows 
> the problem.
> {code:title=spark-shell|borderStyle=solid}
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
> "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
> UserDefinedFunction(<function1>,StringType,List(StringType))
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
> string]
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
> string, new: string]
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
> string, new: string]
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> {code}
> *Updated: user-defined functions must be deterministic. Due to optimization, 
> duplicate invocations may be eliminated or the function may even be invoked 
> more times than it is present in the query.* refer to 
> https://github.com/apache/spark/pull/13087
> For our certain use case, I want to add more detail descriptions. In our 
> project, firstly we generated a dataframe with one column called "fileName" 
> one column called "url", and then we use a udf function (used inside 
> withColumn()) to download the files from the corresponding urls and filter 
> out '{}' data before writing to hdfs:
> {code:title=scala|borderStyle=solid}
> // df: DataFrame["fileName", "url"] 
> val getDataUDF = udf((url: String) => {
>     try { 
>        download data
>     } catch { case e: Exception =>
>       "{}"
>     }
>   })
> val df2 = df.withColumn("data", getDataUDF(df("url")))
>             .filter("data <> '{}'")
> df2.write.save("hdfs path")
> {code}
> Based on our logs, each file will be downloaded twice. As for the running 
> time, the writing job with filter will be twice as the one without filter.
> Another problem is about data correctness. Because it's downloaded twice for 
> each file, we came across some cases that the first downloading (getDataUDF) 
> can get data (not '{}'), and the second downloading return '{}' because of 
> certain connection exception. But i found the filter only worked on the first 
> returned value so that spark will not remove this row but the value inside 
> "data" column was '{}' which is the second returned value. *Even after 
> filter, we get the result dataframe df2 like the follows (file2 with '{}' 
> data should be removed)*:
> |fileName|url|data |
> |     file1       |   url1    |    sth  |    
> |     file2       |   url2    |    '{}'   |
> *So on the high level, we still get '{}' data after filtering out '{}', which 
> is strange. The reason I think is that UDF function is executed twice when 
> filter on new column created by withColumn, and two returned values are 
> different: first one makes filter condition true and second one makes filter 
> condition false. The dataframe will keep the second value which in fact 
> should not appear after filter operation.*



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to