[ 
https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Linbo updated SPARK-15282:
--------------------------
    Description: 
I found this problem on spark version 1.6.1 and based on  [~tedyu] in current 
master branch, the behavior is the same.
Basically, i used udf and df.withColumn to create a "new" column, and then i 
filter the values on this new columns and call show(action). I see the udf 
function (which is used to by withColumn to create the new column) is called 
twice(duplicated). And if filter on "old" column, udf only run once which is 
expected. I attached the example codes,  `filteredOnNewColumnDF.show` shows the 
problem.

{code:title=spark-shell|borderStyle=solid}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction = 
UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+

{code}

*Updated: user-defined functions must be deterministic. Due to optimization, 
duplicate invocations may be eliminated or the function may even be invoked 
more times than it is present in the query.* refer to 
https://github.com/apache/spark/pull/13087

For our certain use case, I want to add more detail descriptions. In our 
project, firstly we generated a dataframe with one column called "fileName" one 
column called "url", and then we use a udf function (used inside withColumn()) 
to download the files from the corresponding urls and filter out '{}' data 
before writing to hdfs:

{code:title=scala|borderStyle=solid}
// df: DataFrame["fileName", "url"] 
val getDataUDF = udf((url: String) => {
    try { 
       download data
    } catch { case e: Exception =>
      "{}"
    }
  })
val df2 = df.withColumn("data", getDataUDF(df("url")))
            .filter("data <> '{}'")
df2.write.save("hdfs path")

{code}

Based on our logs, each file will be downloaded twice. As for the running time, 
the writing job with filter will be twice as the one without filter.

Another problem is about data correctness. Because it's downloaded twice for 
each file, we came across some cases that the first downloading (getDataUDF) 
can get data (not '{}'), and the second downloading return '{}' because of 
certain connection exception. But i found the filter only worked on the first 
returned value so that spark will not remove this row but the value inside 
"data" column was '{}' which is the second returned value. *Even after filter, 
we get the result dataframe df2 like the follows (file2 with '{}' data should 
be removed)*:

|fileName|url|data |
|     file1       |   url1    |    sth  |    
|     file2       |   url2    |    '{}'   |

*So on the high level, we still get '{}' data after filtering out '{}', which 
is strange. The reason I think is that UDF function is executed twice when 
filter on new column created by withColumn, and two returned values are 
different: first one makes filter condition true and second one makes filter 
condition false. The dataframe will keep the second value which in fact should 
not appear after filter operation.*



  was:
I found this problem on spark version 1.6.1 and based on  [~tedyu] in current 
master branch, the behavior is the same.
Basically, i used udf and df.withColumn to create a "new" column, and then i 
filter the values on this new columns and call show(action). I see the udf 
function (which is used to by withColumn to create the new column) is called 
twice(duplicated). And if filter on "old" column, udf only run once which is 
expected. I attached the example codes,  `filteredOnNewColumnDF.show` shows the 
problem.

{code:title=spark-shell|borderStyle=solid}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction = 
UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+

{code}

*Updated: user-defined functions must be deterministic. Due to optimization, 
duplicate invocations may be eliminated or the function may even be invoked 
more times than it is present in the query.* refer to 
https://github.com/apache/spark/pull/13087

For our certain use case, I want to add more detail descriptions. In our 
project, firstly we generated a dataframe with one column called "fileName" one 
column called "url", and then we use a udf function (used inside withColumn()) 
to download the files from the corresponding urls and filter out '{}' data 
before writing to hdfs:

{code:title=scala|borderStyle=solid}
// df: DataFrame["fileName", "url"] 
val getDataUDF = udf((url: String) => {
    try { 
       download data
    } catch { case e: Exception =>
      "{}"
    }
  })
val df2 = df.withColumn("data", getDataUDF(df("url")))
            .filter("data <> '{}'")
df2.write.save("hdfs path")

{code}

Based on our logs, each file will be downloaded twice. As for the running time, 
the writing job with filter will be twice as the one without filter.

Another problem is about data correctness. Because it's downloaded twice for 
each file, we came across some cases that the first downloading (getDataUDF) 
can get data (not '{}'), and the second downloading return '{}' because of 
certain connection exception. But i found the filter only worked on the first 
returned value so that spark will not remove this row but the value inside 
"data" column was '{}' which is the second returned value. Even after filter, 
we get the result dataframe df2 like the follows (file2 with '{}' data should 
be removed):

|fileName|url|data |
|     file1       |   url1    |    sth  |    
|     file2       |   url2    |    '{}'   |

*So on the high level, we still get '{}' data after filtering out '{}', which 
is strange. The reason I think is that UDF function is executed twice when 
filter on new column created by withColumn, and two returned values are 
different: first one makes filter condition true and second one makes filter 
condition false. The dataframe will keep the second value which in fact should 
not appear after filter operation.*




> UDF function executed twice when filter on new column created by withColumn
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-15282
>                 URL: https://issues.apache.org/jira/browse/SPARK-15282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>         Environment: spark 1.6.1
>            Reporter: Linbo
>
> I found this problem on spark version 1.6.1 and based on  [~tedyu] in current 
> master branch, the behavior is the same.
> Basically, i used udf and df.withColumn to create a "new" column, and then i 
> filter the values on this new columns and call show(action). I see the udf 
> function (which is used to by withColumn to create the new column) is called 
> twice(duplicated). And if filter on "old" column, udf only run once which is 
> expected. I attached the example codes,  `filteredOnNewColumnDF.show` shows 
> the problem.
> {code:title=spark-shell|borderStyle=solid}
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
> "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
> UserDefinedFunction(<function1>,StringType,List(StringType))
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
> string]
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
> string, new: string]
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
> string, new: string]
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> {code}
> *Updated: user-defined functions must be deterministic. Due to optimization, 
> duplicate invocations may be eliminated or the function may even be invoked 
> more times than it is present in the query.* refer to 
> https://github.com/apache/spark/pull/13087
> For our certain use case, I want to add more detail descriptions. In our 
> project, firstly we generated a dataframe with one column called "fileName" 
> one column called "url", and then we use a udf function (used inside 
> withColumn()) to download the files from the corresponding urls and filter 
> out '{}' data before writing to hdfs:
> {code:title=scala|borderStyle=solid}
> // df: DataFrame["fileName", "url"] 
> val getDataUDF = udf((url: String) => {
>     try { 
>        download data
>     } catch { case e: Exception =>
>       "{}"
>     }
>   })
> val df2 = df.withColumn("data", getDataUDF(df("url")))
>             .filter("data <> '{}'")
> df2.write.save("hdfs path")
> {code}
> Based on our logs, each file will be downloaded twice. As for the running 
> time, the writing job with filter will be twice as the one without filter.
> Another problem is about data correctness. Because it's downloaded twice for 
> each file, we came across some cases that the first downloading (getDataUDF) 
> can get data (not '{}'), and the second downloading return '{}' because of 
> certain connection exception. But i found the filter only worked on the first 
> returned value so that spark will not remove this row but the value inside 
> "data" column was '{}' which is the second returned value. *Even after 
> filter, we get the result dataframe df2 like the follows (file2 with '{}' 
> data should be removed)*:
> |fileName|url|data |
> |     file1       |   url1    |    sth  |    
> |     file2       |   url2    |    '{}'   |
> *So on the high level, we still get '{}' data after filtering out '{}', which 
> is strange. The reason I think is that UDF function is executed twice when 
> filter on new column created by withColumn, and two returned values are 
> different: first one makes filter condition true and second one makes filter 
> condition false. The dataframe will keep the second value which in fact 
> should not appear after filter operation.*



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to