[ https://issues.apache.org/jira/browse/SPARK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15026105#comment-15026105 ]
Yi Tian edited comment on SPARK-9141 at 11/25/15 4:00 AM: ---------------------------------------------------------- [~marmbrus] Here is my codes: {code:scala} val rdd = sc.parallelize(1 to 10).map{line => new GenericRow(Array[Any]("a","b")).asInstanceOf[Row]} val df = hc.createDataFrame(rdd, StructType(Seq(StructField("a",StringType),StructField("b",StringType)))) val mkArrayUDF = org.apache.spark.sql.functions.udf[Array[String],String,String] ((s1: String, s2: String) => { println("udf called") Array[String](s1, s2) }) val df2 = df.withColumn("arr",mkArrayUDF(df("a"),df("b"))) val df3 = df2.withColumn("e0", df2("arr")(0)).withColumn("e1", df2("arr")(1)) df3.collect().foreach(println) {code} was (Author: tianyi): [~marmbrus] Here is my codes: {quote} val rdd = sc.parallelize(1 to 10).map\{line => new GenericRow(Array\[Any]("a","b")).asInstanceOf\[Row]} val df = hc.createDataFrame(rdd, StructType(Seq(StructField("a",StringType),StructField("b",StringType)))) val mkArrayUDF = org.apache.spark.sql.functions.udf\[Array\[String],String,String] ((s1: String, s2: String) => \{ println("udf called") Array\[String](s1, s2) }) val df2 = df.withColumn("arr",mkArrayUDF(df("a"),df("b"))) val df3 = df2.withColumn("e0", df2("arr")(0)).withColumn("e1", df2("arr")(1)) df3.collect().foreach(println) {quote} > DataFrame recomputed instead of using cached parent. > ---------------------------------------------------- > > Key: SPARK-9141 > URL: https://issues.apache.org/jira/browse/SPARK-9141 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.4.0, 1.4.1 > Reporter: Nick Pritchard > Assignee: Michael Armbrust > Priority: Blocker > Labels: cache, dataframe > Fix For: 1.5.0 > > > As I understand, DataFrame.cache() is supposed to work the same as > RDD.cache(), so that repeated operations on it will use the cached results > and not recompute the entire lineage. However, it seems that some DataFrame > operations (e.g. withColumn) change the underlying RDD lineage so that cache > doesn't work as expected. > Below is a Scala example that demonstrates this. First, I define two UDF's > that use println so that it is easy to see when they are being called. Next, > I create a simple data frame with one row and two columns. Next, I add a > column, cache it, and call count() to force the computation. Lastly, I add > another column, cache it, and call count(). > I would have expected the last statement to only compute the last column, > since everything else was cached. However, because withColumn() changes the > lineage, the whole data frame is recomputed. > {code} > // Examples udf's that println when called > val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 } > val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3 } > // Initial dataset > val df1 = sc.parallelize(Seq(("a", 1))).toDF("name", "value") > // Add column by applying twice udf > val df2 = df1.withColumn("twice", twice($"value")) > df2.cache() > df2.count() //prints Computed: twice(1) > // Add column by applying triple udf > val df3 = df2.withColumn("triple", triple($"value")) > df3.cache() > df3.count() //prints Computed: twice(1)\nComputed: triple(1) > {code} > I found a workaround, which helped me understand what was going on behind the > scenes, but doesn't seem like an ideal solution. Basically, I convert to RDD > then back DataFrame, which seems to freeze the lineage. The code below shows > the workaround for creating the second data frame so cache will work as > expected. > {code} > val df2 = { > val tmp = df1.withColumn("twice", twice($"value")) > sqlContext.createDataFrame(tmp.rdd, tmp.schema) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org