[ https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Linbo updated SPARK-15282: -------------------------- Description: I found this problem on spark version 1.6.1 and based on [~tedyu] in current master branch, the behavior is the same. Basically, i used udf and df.withColumn to create a "new" column, and then i filter the values on this new columns and call show(action). I see the udf function (which is used to by withColumn to create the new column) is called twice(duplicated). And if filter on "old" column, udf only run once which is expected. I attached the example codes, `filteredOnNewColumnDF.show` shows the problem. {code:title=spark-shell|borderStyle=solid} scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1") df: org.apache.spark.sql.DataFrame = [old: string, old1: string] scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s }) udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType)) scala> val newDF = df.withColumn("new", udfFunc(df("old"))) newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string] scala> newDF.show running udf(a) running udf(a1) +---+----+---+ |old|old1|new| +---+----+---+ | a| b| a| | a1| b1| a1| +---+----+---+ scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'") filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string] scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'") filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string] scala> filteredOnNewColumnDF.show running udf(a) running udf(a) running udf(a1) +---+----+---+ |old|old1|new| +---+----+---+ | a| b| a| +---+----+---+ scala> filteredOnOldColumnDF.show running udf(a) +---+----+---+ |old|old1|new| +---+----+---+ | a| b| a| +---+----+---+ {code} Updated: *users can avoid this duplicated executed behaviours by making sure the UDF is deterministic. * refer to https://github.com/apache/spark/pull/13087 For our certain use case, I want to add more detail descriptions. In our project, firstly we generated a dataframe with one column called "fileName" one column called "url", and then we use a udf function (used inside withColumn()) to download the files from the corresponding urls and filter out '{}' data before writing to hdfs: {code:title=spark-shell|borderStyle=solid} // df: DataFrame["fileName", "url"] val getDataUDF = udf((url: String) => { try { download data } catch { case e: Exception => "{}" } }) val df2 = df.withColumn("data", getDataUDF(df("url"))) .filter("data <> '{}'") df2.write.save("hdfs path") {code} Based on our logs, each file will be downloaded twice. As for the running time, the writing job with filter will be twice as the one without filter: !Screen Shot 2016-05-22 at 22.19.24.png! !Screen Shot 2016-05-22 at 22.18.02.png! was: I found this problem on spark version 1.6.1 and based on [~tedyu] in current master branch, the behavior is the same. Basically, i used udf and df.withColumn to create a "new" column, and then i filter the values on this new columns and call show(action). I see the udf function (which is used to by withColumn to create the new column) is called twice(duplicated). And if filter on "old" column, udf only run once which is expected. I attached the example codes, `filteredOnNewColumnDF.show` shows the problem. {code:title=spark-shell|borderStyle=solid} scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1") df: org.apache.spark.sql.DataFrame = [old: string, old1: string] scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s }) udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType)) scala> val newDF = df.withColumn("new", udfFunc(df("old"))) newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string] scala> newDF.show running udf(a) running udf(a1) +---+----+---+ |old|old1|new| +---+----+---+ | a| b| a| | a1| b1| a1| +---+----+---+ scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'") filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string] scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'") filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string] scala> filteredOnNewColumnDF.show running udf(a) running udf(a) running udf(a1) +---+----+---+ |old|old1|new| +---+----+---+ | a| b| a| +---+----+---+ scala> filteredOnOldColumnDF.show running udf(a) +---+----+---+ |old|old1|new| +---+----+---+ | a| b| a| +---+----+---+ {code} Updated: *users can avoid this duplicated executed behaviours by making sure the UDF is deterministic. * refer to https://github.com/apache/spark/pull/13087 For our certain use case, I want to add more detail descriptions. In our project, firstly we generated a dataframe with one column called "fileName" one column called "url", and then we use a udf function (used inside withColumn()) to download the files from the corresponding urls and filter out '{}' data before writing to hdfs: {code:title=spark-shell|borderStyle=solid} // df: DataFrame["fileName", "url"] val getDataUDF = udf((url: String) => { try { download data } catch { case e: Exception => "{}" } }) val df2 = df.withColumn("data", getDataUDF(df("url"))) .filter("data <> '{}'") df2.write.save("hdfs path") {code} Based on our logs, each file will be downloaded twice. As for the running time, the writing job with filter will be twice as the one without filter: !attached-image.gif! !attached-image.gif! > UDF function executed twice when filter on new column created by withColumn > --------------------------------------------------------------------------- > > Key: SPARK-15282 > URL: https://issues.apache.org/jira/browse/SPARK-15282 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.1 > Environment: spark 1.6.1 > Reporter: Linbo > Attachments: Screen Shot 2016-05-22 at 22.18.02.png, Screen Shot > 2016-05-22 at 22.19.24.png > > > I found this problem on spark version 1.6.1 and based on [~tedyu] in current > master branch, the behavior is the same. > Basically, i used udf and df.withColumn to create a "new" column, and then i > filter the values on this new columns and call show(action). I see the udf > function (which is used to by withColumn to create the new column) is called > twice(duplicated). And if filter on "old" column, udf only run once which is > expected. I attached the example codes, `filteredOnNewColumnDF.show` shows > the problem. > {code:title=spark-shell|borderStyle=solid} > scala> import org.apache.spark.sql.functions._ > import org.apache.spark.sql.functions._ > scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", > "b1"))).toDF("old","old1") > df: org.apache.spark.sql.DataFrame = [old: string, old1: string] > scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s }) > udfFunc: org.apache.spark.sql.UserDefinedFunction = > UserDefinedFunction(<function1>,StringType,List(StringType)) > scala> val newDF = df.withColumn("new", udfFunc(df("old"))) > newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: > string] > scala> newDF.show > running udf(a) > running udf(a1) > +---+----+---+ > |old|old1|new| > +---+----+---+ > | a| b| a| > | a1| b1| a1| > +---+----+---+ > scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'") > filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: > string, new: string] > scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'") > filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: > string, new: string] > scala> filteredOnNewColumnDF.show > running udf(a) > running udf(a) > running udf(a1) > +---+----+---+ > |old|old1|new| > +---+----+---+ > | a| b| a| > +---+----+---+ > scala> filteredOnOldColumnDF.show > running udf(a) > +---+----+---+ > |old|old1|new| > +---+----+---+ > | a| b| a| > +---+----+---+ > {code} > Updated: *users can avoid this duplicated executed behaviours by making sure > the UDF is deterministic. * refer to > https://github.com/apache/spark/pull/13087 > For our certain use case, I want to add more detail descriptions. In our > project, firstly we generated a dataframe with one column called "fileName" > one column called "url", and then we use a udf function (used inside > withColumn()) to download the files from the corresponding urls and filter > out '{}' data before writing to hdfs: > {code:title=spark-shell|borderStyle=solid} > // df: DataFrame["fileName", "url"] > val getDataUDF = udf((url: String) => { > try { > download data > } catch { case e: Exception => > "{}" > } > }) > val df2 = df.withColumn("data", getDataUDF(df("url"))) > .filter("data <> '{}'") > df2.write.save("hdfs path") > {code} > Based on our logs, each file will be downloaded twice. As for the running > time, the writing job with filter will be twice as the one without filter: > !Screen Shot 2016-05-22 at 22.19.24.png! > !Screen Shot 2016-05-22 at 22.18.02.png! -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org