[ https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Linbo updated SPARK-15282: -------------------------- Attachment: (was: Screen Shot 2016-05-22 at 22.18.02.png) > UDF function executed twice when filter on new column created by withColumn > --------------------------------------------------------------------------- > > Key: SPARK-15282 > URL: https://issues.apache.org/jira/browse/SPARK-15282 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.1 > Environment: spark 1.6.1 > Reporter: Linbo > > I found this problem on spark version 1.6.1 and based on [~tedyu] in current > master branch, the behavior is the same. > Basically, i used udf and df.withColumn to create a "new" column, and then i > filter the values on this new columns and call show(action). I see the udf > function (which is used to by withColumn to create the new column) is called > twice(duplicated). And if filter on "old" column, udf only run once which is > expected. I attached the example codes, `filteredOnNewColumnDF.show` shows > the problem. > {code:title=spark-shell|borderStyle=solid} > scala> import org.apache.spark.sql.functions._ > import org.apache.spark.sql.functions._ > scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", > "b1"))).toDF("old","old1") > df: org.apache.spark.sql.DataFrame = [old: string, old1: string] > scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s }) > udfFunc: org.apache.spark.sql.UserDefinedFunction = > UserDefinedFunction(<function1>,StringType,List(StringType)) > scala> val newDF = df.withColumn("new", udfFunc(df("old"))) > newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: > string] > scala> newDF.show > running udf(a) > running udf(a1) > +---+----+---+ > |old|old1|new| > +---+----+---+ > | a| b| a| > | a1| b1| a1| > +---+----+---+ > scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'") > filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: > string, new: string] > scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'") > filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: > string, new: string] > scala> filteredOnNewColumnDF.show > running udf(a) > running udf(a) > running udf(a1) > +---+----+---+ > |old|old1|new| > +---+----+---+ > | a| b| a| > +---+----+---+ > scala> filteredOnOldColumnDF.show > running udf(a) > +---+----+---+ > |old|old1|new| > +---+----+---+ > | a| b| a| > +---+----+---+ > {code} > Updated: *users can avoid this duplicated executed behaviours by making sure > the UDF is deterministic. * refer to > https://github.com/apache/spark/pull/13087 > For our certain use case, I want to add more detail descriptions. In our > project, firstly we generated a dataframe with one column called "fileName" > one column called "url", and then we use a udf function (used inside > withColumn()) to download the files from the corresponding urls and filter > out '{}' data before writing to hdfs: > {code:title=spark-shell|borderStyle=solid} > // df: DataFrame["fileName", "url"] > val getDataUDF = udf((url: String) => { > try { > download data > } catch { case e: Exception => > "{}" > } > }) > val df2 = df.withColumn("data", getDataUDF(df("url"))) > .filter("data <> '{}'") > df2.write.save("hdfs path") > {code} > Based on our logs, each file will be downloaded twice. As for the running > time, the writing job with filter will be twice as the one without filter. > Another problem is about data correctness. Because it's downloaded twice for > each file, we came across some cases that the first downloading (getDataUDF) > can get data (not '{}'), and the second downloading return '{}' because of > certain connection exception. But i found the filter only worked on the first > returned value so that spark will not remove this row but the value inside > "data" column was '{}' which is the second returned value. Even after filter, > we get the result dataframe df2 like the follows (file2 with '{}' data should > be removed): > ||fileName| | url | | data || > |file1 | url1 | sth | > |file2 | url2 | '{}' | > *So on the high level, we still get '{}' data after filtering out '{}', which > is strange. The reason I think is that UDF function is executed twice when > filter on new column created by withColumn, and two returned values are > different: first one makes filter condition true and second one makes filter > condition false. The dataframe will keep the second value which in fact > should not appear after filter operation.* -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org