Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

2016-05-11 Thread James Hammerton
This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu  wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin  wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
>> "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
>> UserDefinedFunction(,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
>> string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---++---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>


Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

2016-05-11 Thread Ted Yu
In master branch, behavior is the same.

Suggest opening a JIRA if you haven't done so.

On Wed, May 11, 2016 at 6:55 AM, Tony Jin  wrote:

> Hi guys,
>
> I have a problem about spark DataFrame. My spark version is 1.6.1.
> Basically, i used udf and df.withColumn to create a "new" column, and then
> i filter the values on this new columns and call show(action). I see the
> udf function (which is used to by withColumn to create the new column) is
> called twice(duplicated). And if filter on "old" column, udf only run once
> which is expected. I attached the example codes, line 30~38 shows the
> problem.
>
>  Anyone knows the internal reason? Can you give me any advices? Thank you
> very much.
>
>
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
> 11
> 12
> 13
> 14
> 15
> 16
> 17
> 18
> 19
> 20
> 21
> 22
> 23
> 24
> 25
> 26
> 27
> 28
> 29
> 30
> 31
> 32
> 33
> 34
> 35
> 36
> 37
> 38
> 39
> 40
> 41
> 42
> 43
> 44
> 45
> 46
> 47
>
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
>
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
> "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
> UserDefinedFunction(,StringType,List(StringType))
>
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
> string]
>
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---++---+
> |old|old1|new|
> +---++---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---++---+
>
>
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
> string, new: string]
>
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
> string, new: string]
>
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---++---+
> |old|old1|new|
> +---++---+
> |  a|   b|  a|
> +---++---+
>
>
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---++---+
> |old|old1|new|
> +---++---+
> |  a|   b|  a|
> +---++---+
>
>
>
> Best wishes.
> By Linbo
>
>


dataframe udf functioin will be executed twice when filter on new column created by withColumn

2016-05-11 Thread Tony Jin
Hi guys,

I have a problem about spark DataFrame. My spark version is 1.6.1.
Basically, i used udf and df.withColumn to create a "new" column, and then
i filter the values on this new columns and call show(action). I see the
udf function (which is used to by withColumn to create the new column) is
called twice(duplicated). And if filter on "old" column, udf only run once
which is expected. I attached the example codes, line 30~38 shows the
problem.

 Anyone knows the internal reason? Can you give me any advices? Thank you
very much.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction =
UserDefinedFunction(,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---++---+
|old|old1|new|
+---++---+
|  a|   b|  a|
| a1|  b1| a1|
+---++---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---++---+
|old|old1|new|
+---++---+
|  a|   b|  a|
+---++---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---++---+
|old|old1|new|
+---++---+
|  a|   b|  a|
+---++---+



Best wishes.
By Linbo