Hi guys,

I have a problem about spark DataFrame. My spark version is 1.6.1.
Basically, i used udf and df.withColumn to create a "new" column, and then
i filter the values on this new columns and call show(action). I see the
udf function (which is used to by withColumn to create the new column) is
called twice(duplicated). And if filter on "old" column, udf only run once
which is expected. I attached the example codes, line 30~38 shows the
problem.

 Anyone knows the internal reason? Can you give me any advices? Thank you
very much.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction =
UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+



Best wishes.
By Linbo

Reply via email to