This may be related to: https://issues.apache.org/jira/browse/SPARK-13773
Regards, James On 11 May 2016 at 15:49, Ted Yu <yuzhih...@gmail.com> wrote: > In master branch, behavior is the same. > > Suggest opening a JIRA if you haven't done so. > > On Wed, May 11, 2016 at 6:55 AM, Tony Jin <linbojin...@gmail.com> wrote: > >> Hi guys, >> >> I have a problem about spark DataFrame. My spark version is 1.6.1. >> Basically, i used udf and df.withColumn to create a "new" column, and >> then i filter the values on this new columns and call show(action). I see >> the udf function (which is used to by withColumn to create the new column) >> is called twice(duplicated). And if filter on "old" column, udf only run >> once which is expected. I attached the example codes, line 30~38 shows the >> problem. >> >> Anyone knows the internal reason? Can you give me any advices? Thank you >> very much. >> >> >> 1 >> 2 >> 3 >> 4 >> 5 >> 6 >> 7 >> 8 >> 9 >> 10 >> 11 >> 12 >> 13 >> 14 >> 15 >> 16 >> 17 >> 18 >> 19 >> 20 >> 21 >> 22 >> 23 >> 24 >> 25 >> 26 >> 27 >> 28 >> 29 >> 30 >> 31 >> 32 >> 33 >> 34 >> 35 >> 36 >> 37 >> 38 >> 39 >> 40 >> 41 >> 42 >> 43 >> 44 >> 45 >> 46 >> 47 >> >> scala> import org.apache.spark.sql.functions._ >> import org.apache.spark.sql.functions._ >> >> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", >> "b1"))).toDF("old","old1") >> df: org.apache.spark.sql.DataFrame = [old: string, old1: string] >> >> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s }) >> udfFunc: org.apache.spark.sql.UserDefinedFunction = >> UserDefinedFunction(<function1>,StringType,List(StringType)) >> >> scala> val newDF = df.withColumn("new", udfFunc(df("old"))) >> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: >> string] >> >> scala> newDF.show >> running udf(a) >> running udf(a1) >> +---+----+---+ >> |old|old1|new| >> +---+----+---+ >> | a| b| a| >> | a1| b1| a1| >> +---+----+---+ >> >> >> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'") >> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: >> string, new: string] >> >> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'") >> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: >> string, new: string] >> >> scala> filteredOnNewColumnDF.show >> running udf(a) >> running udf(a) >> running udf(a1) >> +---+----+---+ >> |old|old1|new| >> +---+----+---+ >> | a| b| a| >> +---+----+---+ >> >> >> scala> filteredOnOldColumnDF.show >> running udf(a) >> +---+----+---+ >> |old|old1|new| >> +---+----+---+ >> | a| b| a| >> +---+----+---+ >> >> >> >> Best wishes. >> By Linbo >> >> >