This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu <yuzhih...@gmail.com> wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin <linbojin...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
>> "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
>> UserDefinedFunction(<function1>,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
>> string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---+----+---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> +---+----+---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> +---+----+---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>

Reply via email to